diff --git a/src/author_identification.py b/src/author_identification.py deleted file mode 100755 index ff1c894..0000000 --- a/src/author_identification.py +++ /dev/null @@ -1,146 +0,0 @@ -import util._hide_sklearn_warnings -from data.dante_loader import load_latin_corpus, list_authors -from data.features import * -from model import AuthorshipVerificator -from util.evaluation import f1_from_counters -import argparse - -AUTHORS_CORPUS_I = ['Dante', 'ClaraAssisiensis', 'GiovanniBoccaccio', 'GuidoFaba', 'PierDellaVigna'] -AUTHORS_CORPUS_II = ['Dante', 'BeneFlorentinus', 'BenvenutoDaImola', 'BoncompagnoDaSigna', 'ClaraAssisiensis', - 'FilippoVillani', 'GiovanniBoccaccio', 'GiovanniDelVirgilio', 'GrazioloBambaglioli', 'GuidoDaPisa', - 'GuidoDeColumnis', 'GuidoFaba', 'IacobusDeVaragine', 'IohannesDeAppia', 'IohannesDePlanoCarpini', - 'IulianusDeSpira', 'NicolaTrevet', 'PierDellaVigna', 'PietroAlighieri', 'RaimundusLullus', - 'RyccardusDeSanctoGermano', 'ZonoDeMagnalis'] - - -DEBUG_MODE=True - -def main(): - log = open(args.log, 'wt') - discarded = 0 - f1_scores = [] - counters = [] - for i, author in enumerate(args.authors): - path = args.corpuspath - print('='*80) - print(f'Authorship Identification for {author} (complete {i}/{len(args.authors)})') - print(f'Corpus {path}') - print('-'*80) - - positive, negative, pos_files, neg_files, ep_text = load_latin_corpus( - path, positive_author=author, unknown_target=args.unknown - ) - files = np.asarray(pos_files + neg_files) - if len(positive) < 2: - discarded += 1 - print(f'discarding analysis for {author} which has only {len(positive)} documents') - continue - - n_full_docs = len(positive) + len(negative) - print(f'read {n_full_docs} documents from {path}') - - feature_extractor = FeatureExtractor( - function_words_freq='latin', - conjugations_freq='latin', - features_Mendenhall=True, - features_sentenceLengths=True, - feature_selection_ratio=0.1 if DEBUG_MODE else 1, - wordngrams=True, n_wordngrams=(1, 2), - charngrams=True, n_charngrams=(3, 4, 5), - preserve_punctuation=False, - split_documents=True, - split_policy=split_by_sentences, - window_size=3, - normalize_features=True - ) - - Xtr, ytr, groups = feature_extractor.fit_transform(positive, negative) - - print('Fitting the Verificator') - if args.C is None: - params = {'C': np.logspace(0, 1, 2)} if DEBUG_MODE else {'C': np.logspace(-3, +3, 7)} - C = 1. - else: - params = None - C = args.C - - if args.unknown: - av = AuthorshipVerificator(C=C, param_grid=params) - av.fit(Xtr, ytr) - - print(f'Checking for the hypothesis that {author} was the author of {args.unknown}') - ep, ep_fragments = feature_extractor.transform(ep_text, return_fragments=True, window_size=3) - pred, _ = av.predict_proba_with_fragments(ep) - tee(f'{args.unknown}: Posterior probability for {author} is {pred:.3f}', log) - - if args.loo: - av = AuthorshipVerificator(C=C, param_grid=params) - print('Validating the Verificator (Leave-One-Out)') - score_ave, score_std, tp, fp, fn, tn = av.leave_one_out( - Xtr, ytr, files, groups, test_lowest_index_only=True, counters=True - ) - f1_scores.append(f1_from_counters(tp, fp, fn, tn)) - counters.append((tp, fp, fn, tn)) - tee(f'F1 for {author} = {f1_scores[-1]:.3f}', log) - print(f'TP={tp} FP={fp} FN={fn} TN={tn}') - - if args.loo: - print(f'Computing macro- and micro-averages (discarded {discarded}/{len(args.authors)})') - f1_scores = np.array(f1_scores) - counters = np.array(counters) - - macro_f1 = f1_scores.mean() - micro_f1 = f1_from_counters(*counters.sum(axis=0).tolist()) - - tee(f'LOO Macro-F1 = {macro_f1:.3f}', log) - tee(f'LOO Micro-F1 = {micro_f1:.3f}', log) - print() - - log.close() - - if DEBUG_MODE: - print('DEBUG_MODE ON') - - -def tee(msg, log): - print(msg) - log.write(f'{msg}\n') - log.flush() - - -if __name__ == '__main__': - import os - - # Training settings - parser = argparse.ArgumentParser(description='Authorship verification for Epistola XIII') - parser.add_argument('corpuspath', type=str, metavar='CORPUSPATH', - help=f'Path to the directory containing the corpus (documents must be named ' - f'_.txt)') - parser.add_argument('positive', type=str, default="Dante", metavar='AUTHOR', - help= f'Positive author for the hypothesis (default "Dante"); set to "ALL" to check ' - f'every author') - parser.add_argument('--loo', default=False, action='store_true', - help='submit each binary classifier to leave-one-out validation') - parser.add_argument('--unknown', type=str, metavar='PATH', default=None, - help='path to the file of unknown paternity (default None)') - parser.add_argument('--log', type=str, metavar='PATH', default='./results.txt', - help='path to the log file where to write the results (default ./results.txt)') - parser.add_argument('--C', type=float, metavar='C', default=None, - help='set the parameter C (trade off between error and margin) or leave as None to optimize') - - args = parser.parse_args() - - if args.positive == 'ALL': - args.authors = list_authors(args.corpuspath, skip_prefix='Epistola') - else: - if (args.positive not in AUTHORS_CORPUS_I) and (args.positive in AUTHORS_CORPUS_II): - print(f'warning: author {args.positive} is not in the known list of authors for CORPUS I nor CORPUS II') - assert args.positive in list_authors(args.corpuspath, skip_prefix='Epistola'), 'unexpected author' - args.authors = [args.positive] - - assert args.unknown or args.loo, 'error: nor an unknown document, nor LOO have been requested. Nothing to do.' - assert os.path.exists(args.corpuspath), f'corpus path {args.corpuspath} does not exist' - assert args.unknown is None or os.path.exists(args.unknown), '"unknown file" does not exist' - - main() - diff --git a/src/author_identification_loo.py b/src/author_identification_loo.py index 567d286..3349946 100755 --- a/src/author_identification_loo.py +++ b/src/author_identification_loo.py @@ -1,111 +1,84 @@ -#import util._hide_sklearn_warnings -from data.dante_loader import load_latin_corpus, list_authors +from data.dante_loader import load_latin_corpus from data.features import * -from model import AuthorshipVerificator, RangeFeatureSelector, leave_one_out -from util.evaluation import f1_from_counters +from model import AuthorshipVerificator +import settings +from util.evaluation import f1_from_counters, leave_one_out import argparse -from sklearn.pipeline import Pipeline - -AUTHORS_CORPUS_I = ['Dante', 'ClaraAssisiensis', 'GiovanniBoccaccio', 'GuidoFaba', 'PierDellaVigna'] -AUTHORS_CORPUS_II = ['Dante', 'BeneFlorentinus', 'BenvenutoDaImola', 'BoncompagnoDaSigna', 'ClaraAssisiensis', - 'FilippoVillani', 'GiovanniBoccaccio', 'GiovanniDelVirgilio', 'GrazioloBambaglioli', 'GuidoDaPisa', - 'GuidoDeColumnis', 'GuidoFaba', 'IacobusDeVaragine', 'IohannesDeAppia', 'IohannesDePlanoCarpini', - 'IulianusDeSpira', 'NicolaTrevet', 'PierDellaVigna', 'PietroAlighieri', 'RaimundusLullus', - 'RyccardusDeSanctoGermano', 'ZonoDeMagnalis'] - - -DEBUG_MODE = True - +import pickle +import helpers +from helpers import tee +import os def main(): log = open(args.log, 'wt') discarded = 0 - f1_scores = [] - counters = [] + f1_scores, acc_scores, counters = [], [], [] + path = args.corpuspath + for i, author in enumerate(args.authors): - path = args.corpuspath + print('='*80) - print(f'Authorship Identification for {author} (complete {i}/{len(args.authors)})') - print(f'Corpus {path}') + print(f'[{args.corpus_name}] Authorship Identification for {author} (complete {i}/{len(args.authors)})') print('-'*80) - positive, negative, pos_files, neg_files, ep_text = load_latin_corpus(path, positive_author=author) - files = np.asarray(pos_files + neg_files) - if len(positive) < 2: - discarded += 1 - print(f'discarding analysis for {author} which has only {len(positive)} documents') - continue - - n_full_docs = len(positive) + len(negative) - print(f'read {n_full_docs} documents from {path}') - - feature_extractor = FeatureExtractor( - function_words_freq='latin', - conjugations_freq='latin', - features_Mendenhall=True, - features_sentenceLengths=True, - feature_selection_ratio=0.05 if DEBUG_MODE else 1, - wordngrams=True, n_wordngrams=(1, 2), - charngrams=True, n_charngrams=(3, 4, 5), - preserve_punctuation=False, - split_documents=True, - split_policy=split_by_sentences, - window_size=3, - normalize_features=True - ) - - Xtr, ytr, groups = feature_extractor.fit_transform(positive, negative) - - print('Fitting the Verificator') - #params = {'C': np.logspace(0, 1, 2)} if DEBUG_MODE else {'C': np.logspace(-3, +3, 7)} - params = {'C': np.logspace(0, 1, 2)} if DEBUG_MODE else {'C': [1,10,100,1000,0.1,0.01,0.001]} - - slice_charngrams = feature_extractor.feature_range['_cngrams_task'] - slice_wordngrams = feature_extractor.feature_range['_wngrams_task'] - if slice_charngrams.start < slice_wordngrams.start: - slice_first, slice_second = slice_charngrams, slice_wordngrams + pickle_file = f'Corpus{args.corpus_name}.Author{author}.window3.GFS1.pickle' + if os.path.exists(pickle_file): + print(f'pickle {pickle_file} exists... loading it') + Xtr, ytr, groups, files, frange_chgrams, frange_wograms, fragments_range = pickle.load(open(pickle_file, 'rb')) else: - slice_first, slice_second = slice_wordngrams, slice_charngrams - av = Pipeline([ - ('featsel_cngrams', RangeFeatureSelector(slice_second, 0.05)), - ('featsel_wngrams', RangeFeatureSelector(slice_first, 0.05)), - ('av', AuthorshipVerificator(C=1, param_grid=params)) - ]) + print(f'pickle {pickle_file} noes not exists... generating it') + positive, negative, pos_files, neg_files, ep_text = load_latin_corpus(path, positive_author=author) + files = np.asarray(pos_files + neg_files) + if len(positive) < 2: + discarded += 1 + print(f'discarding analysis for {author} which has only {len(positive)} documents') + continue + + n_full_docs = len(positive) + len(negative) + print(f'read {n_full_docs} documents from {path}') + + feature_extractor = FeatureExtractor(**settings.config_loo) + + Xtr, ytr, groups = feature_extractor.fit_transform(positive, negative) + frange_chgrams = feature_extractor.feature_range['_cngrams_task'] + frange_wograms = feature_extractor.feature_range['_wngrams_task'] + fragments_range = feature_extractor.fragments_range + pickle.dump((Xtr, ytr, groups, files, frange_chgrams, frange_wograms, fragments_range), + open(pickle_file, 'wb'), pickle.HIGHEST_PROTOCOL) + + learner = args.learner.lower() + av = AuthorshipVerificator(learner=learner, C=settings.DEFAULT_C, alpha=settings.DEFAULT_ALPHA, + param_grid=settings.param_grid[learner], class_weight=args.class_weight, + random_seed=settings.SEED, feat_selection_slices=[frange_chgrams, frange_wograms], + feat_selection_ratio=args.featsel) print('Validating the Verificator (Leave-One-Out)') - score_ave, score_std, tp, fp, fn, tn = leave_one_out( - av, Xtr, ytr, files, groups, test_lowest_index_only=True, counters=True - ) - f1_scores.append(f1_from_counters(tp, fp, fn, tn)) + accuracy, f1, tp, fp, fn, tn, missclassified = leave_one_out(av, Xtr, ytr, files, groups) + acc_scores.append(accuracy) + f1_scores.append(f1) counters.append((tp, fp, fn, tn)) - tee(f'F1 for {author} = {f1_scores[-1]:.3f}', log) - print(f'TP={tp} FP={fp} FN={fn} TN={tn}') + + tee(f'{author}',log) + tee(f'\tF1 = {f1:.3f}', log) + tee(f'\tAcc = {accuracy:.3f}', log) + tee(f'\tTP={tp} FP={fp} FN={fn} TN={tn}', log) + tee(f'\tErrors for {author}: {", ".join(missclassified)}', log) print(f'Computing macro- and micro-averages (discarded {discarded}/{len(args.authors)})') - f1_scores = np.array(f1_scores) counters = np.array(counters) - macro_f1 = f1_scores.mean() + acc_mean = np.array(acc_scores).mean() + macro_f1 = np.array(f1_scores).mean() micro_f1 = f1_from_counters(*counters.sum(axis=0).tolist()) tee(f'LOO Macro-F1 = {macro_f1:.3f}', log) tee(f'LOO Micro-F1 = {micro_f1:.3f}', log) - print() + tee(f'LOO Accuracy = {acc_mean:.3f}', log) log.close() - if DEBUG_MODE: - print('DEBUG_MODE ON') - - -def tee(msg, log): - print(msg) - log.write(f'{msg}\n') - log.flush() - if __name__ == '__main__': - import os # Training settings parser = argparse.ArgumentParser(description='Authorship verification for MedLatin ' @@ -117,20 +90,23 @@ if __name__ == '__main__': help= f'Positive author for the hypothesis (default "Dante"); set to "ALL" to check ' f'every author') parser.add_argument('--log', type=str, metavar='PATH', default=None, - help='path to the log file where to write the results ' - '(if not specified, then ./results_{corpuspath.name})') + help='path to the log file where to write the results (if not specified, then the name is' + 'automatically generated from the arguments and stored in ../results/)') + parser.add_argument('--featsel', default=0.1, metavar='FEAT_SEL_RATIO', + help=f'feature selection ratio for char- and word-ngrams') + parser.add_argument('--class_weight', type=str, default=settings.CLASS_WEIGHT, metavar='CLASS_WEIGHT', + help=f"whether or not to reweight classes' importance") + parser.add_argument('--learner', type=str, default='lr', metavar='LEARNER', + help=f"classification learner (lr, svm, mnb)") args = parser.parse_args() - if args.positive == 'ALL': - args.authors = list_authors(args.corpuspath, skip_prefix='Epistola') - else: - if (args.positive not in AUTHORS_CORPUS_I) and (args.positive in AUTHORS_CORPUS_II): - print(f'warning: author {args.positive} is not in the known list of authors for CORPUS I nor CORPUS II') - assert args.positive in list_authors(args.corpuspath, skip_prefix='Epistola'), 'unexpected author' - args.authors = [args.positive] - - assert os.path.exists(args.corpuspath), f'corpus path {args.corpuspath} does not exist' + helpers.check_author(args) + helpers.check_feat_sel_range(args) + helpers.check_class_weight(args) + helpers.check_corpus_path(args) + helpers.check_learner(args) + helpers.check_log_loo(args) main() diff --git a/src/author_identification_unknown.py b/src/author_identification_unknown.py index ff1c894..5d7b25c 100755 --- a/src/author_identification_unknown.py +++ b/src/author_identification_unknown.py @@ -1,146 +1,101 @@ -import util._hide_sklearn_warnings from data.dante_loader import load_latin_corpus, list_authors from data.features import * from model import AuthorshipVerificator -from util.evaluation import f1_from_counters +import settings import argparse +import helpers +from helpers import tee +import os +import pickle -AUTHORS_CORPUS_I = ['Dante', 'ClaraAssisiensis', 'GiovanniBoccaccio', 'GuidoFaba', 'PierDellaVigna'] -AUTHORS_CORPUS_II = ['Dante', 'BeneFlorentinus', 'BenvenutoDaImola', 'BoncompagnoDaSigna', 'ClaraAssisiensis', - 'FilippoVillani', 'GiovanniBoccaccio', 'GiovanniDelVirgilio', 'GrazioloBambaglioli', 'GuidoDaPisa', - 'GuidoDeColumnis', 'GuidoFaba', 'IacobusDeVaragine', 'IohannesDeAppia', 'IohannesDePlanoCarpini', - 'IulianusDeSpira', 'NicolaTrevet', 'PierDellaVigna', 'PietroAlighieri', 'RaimundusLullus', - 'RyccardusDeSanctoGermano', 'ZonoDeMagnalis'] - - -DEBUG_MODE=True def main(): log = open(args.log, 'wt') + discarded = 0 - f1_scores = [] - counters = [] + path = args.corpuspath + for i, author in enumerate(args.authors): - path = args.corpuspath print('='*80) - print(f'Authorship Identification for {author} (complete {i}/{len(args.authors)})') - print(f'Corpus {path}') + print(f'[{args.corpus_name}] Authorship Identification for {author} (complete {i}/{len(args.authors)})') print('-'*80) - positive, negative, pos_files, neg_files, ep_text = load_latin_corpus( - path, positive_author=author, unknown_target=args.unknown - ) - files = np.asarray(pos_files + neg_files) - if len(positive) < 2: - discarded += 1 - print(f'discarding analysis for {author} which has only {len(positive)} documents') - continue - - n_full_docs = len(positive) + len(negative) - print(f'read {n_full_docs} documents from {path}') - - feature_extractor = FeatureExtractor( - function_words_freq='latin', - conjugations_freq='latin', - features_Mendenhall=True, - features_sentenceLengths=True, - feature_selection_ratio=0.1 if DEBUG_MODE else 1, - wordngrams=True, n_wordngrams=(1, 2), - charngrams=True, n_charngrams=(3, 4, 5), - preserve_punctuation=False, - split_documents=True, - split_policy=split_by_sentences, - window_size=3, - normalize_features=True - ) - - Xtr, ytr, groups = feature_extractor.fit_transform(positive, negative) - - print('Fitting the Verificator') - if args.C is None: - params = {'C': np.logspace(0, 1, 2)} if DEBUG_MODE else {'C': np.logspace(-3, +3, 7)} - C = 1. + pickle_file = f'Corpus{args.corpus_name}.Author{author}.window3.GFS{args.featsel}.unk{args.unknown_name}.pickle' + if os.path.exists(pickle_file): + print(f'pickle {pickle_file} exists... loading it') + Xtr, ytr, groups, files, frange_chgrams, frange_wograms, fragments_range, ep, ep_fragments = \ + pickle.load(open(pickle_file, 'rb')) else: - params = None - C = args.C + print(f'pickle {pickle_file} noes not exists... generating it') + positive, negative, pos_files, neg_files, ep_text = \ + load_latin_corpus(path, positive_author=author, unknown_target=args.unknown) + files = np.asarray(pos_files + neg_files) + if len(positive) < 2: + discarded += 1 + print(f'discarding analysis for {author} which has only {len(positive)} documents') + continue - if args.unknown: - av = AuthorshipVerificator(C=C, param_grid=params) - av.fit(Xtr, ytr) + n_full_docs = len(positive) + len(negative) + print(f'read {n_full_docs} documents from {path}') + + settings.config_unk['feature_selection_ratio'] = args.featsel + feature_extractor = FeatureExtractor(**settings.config_unk) + + Xtr, ytr, groups = feature_extractor.fit_transform(positive, negative) + frange_chgrams = feature_extractor.feature_range['_cngrams_task'] + frange_wograms = feature_extractor.feature_range['_wngrams_task'] + fragments_range = feature_extractor.fragments_range - print(f'Checking for the hypothesis that {author} was the author of {args.unknown}') ep, ep_fragments = feature_extractor.transform(ep_text, return_fragments=True, window_size=3) - pred, _ = av.predict_proba_with_fragments(ep) - tee(f'{args.unknown}: Posterior probability for {author} is {pred:.3f}', log) - if args.loo: - av = AuthorshipVerificator(C=C, param_grid=params) - print('Validating the Verificator (Leave-One-Out)') - score_ave, score_std, tp, fp, fn, tn = av.leave_one_out( - Xtr, ytr, files, groups, test_lowest_index_only=True, counters=True - ) - f1_scores.append(f1_from_counters(tp, fp, fn, tn)) - counters.append((tp, fp, fn, tn)) - tee(f'F1 for {author} = {f1_scores[-1]:.3f}', log) - print(f'TP={tp} FP={fp} FN={fn} TN={tn}') + pickle.dump((Xtr, ytr, groups, files, frange_chgrams, frange_wograms, fragments_range, ep, ep_fragments), + open(pickle_file, 'wb'), pickle.HIGHEST_PROTOCOL) - if args.loo: - print(f'Computing macro- and micro-averages (discarded {discarded}/{len(args.authors)})') - f1_scores = np.array(f1_scores) - counters = np.array(counters) + learner = args.learner.lower() + av = AuthorshipVerificator(learner=learner, C=settings.DEFAULT_C, alpha=settings.DEFAULT_ALPHA, + param_grid=settings.param_grid[learner], class_weight=args.class_weight, + random_seed=settings.SEED) - macro_f1 = f1_scores.mean() - micro_f1 = f1_from_counters(*counters.sum(axis=0).tolist()) + av.fit(Xtr, ytr, groups) - tee(f'LOO Macro-F1 = {macro_f1:.3f}', log) - tee(f'LOO Micro-F1 = {micro_f1:.3f}', log) - print() + print(f'Checking for the hypothesis that {author} was the author of {args.unknown_name}') + pred = av.predict_proba(ep) + pred = pred[0,1] + tee(f'{args.unknown}: Posterior probability for {author} is {pred:.4f}', log) log.close() - if DEBUG_MODE: - print('DEBUG_MODE ON') - - -def tee(msg, log): - print(msg) - log.write(f'{msg}\n') - log.flush() - if __name__ == '__main__': - import os # Training settings - parser = argparse.ArgumentParser(description='Authorship verification for Epistola XIII') + parser = argparse.ArgumentParser(description='Authorship verification for a text of unknown paternity') parser.add_argument('corpuspath', type=str, metavar='CORPUSPATH', help=f'Path to the directory containing the corpus (documents must be named ' f'_.txt)') parser.add_argument('positive', type=str, default="Dante", metavar='AUTHOR', help= f'Positive author for the hypothesis (default "Dante"); set to "ALL" to check ' f'every author') - parser.add_argument('--loo', default=False, action='store_true', - help='submit each binary classifier to leave-one-out validation') - parser.add_argument('--unknown', type=str, metavar='PATH', default=None, + parser.add_argument('unknown', type=str, metavar='PATH', default=None, help='path to the file of unknown paternity (default None)') - parser.add_argument('--log', type=str, metavar='PATH', default='./results.txt', - help='path to the log file where to write the results (default ./results.txt)') - parser.add_argument('--C', type=float, metavar='C', default=None, - help='set the parameter C (trade off between error and margin) or leave as None to optimize') + parser.add_argument('--log', type=str, metavar='PATH', default=None, + help='path to the log file where to write the results (if not specified, then the name is' + 'automatically generated from the arguments and stored in ../results/)') + parser.add_argument('--featsel', default=0.1, metavar='FEAT_SEL_RATIO', + help=f'feature selection ratio for char- and word-ngrams') + parser.add_argument('--class_weight', type=str, default=settings.CLASS_WEIGHT, metavar='CLASS_WEIGHT', + help=f"whether or not to reweight classes' importance") + parser.add_argument('--learner', type=str, default='LR', metavar='LEARNER', + help=f"classification learner (LR, SVM, MNB, RF)") args = parser.parse_args() - if args.positive == 'ALL': - args.authors = list_authors(args.corpuspath, skip_prefix='Epistola') - else: - if (args.positive not in AUTHORS_CORPUS_I) and (args.positive in AUTHORS_CORPUS_II): - print(f'warning: author {args.positive} is not in the known list of authors for CORPUS I nor CORPUS II') - assert args.positive in list_authors(args.corpuspath, skip_prefix='Epistola'), 'unexpected author' - args.authors = [args.positive] - - assert args.unknown or args.loo, 'error: nor an unknown document, nor LOO have been requested. Nothing to do.' - assert os.path.exists(args.corpuspath), f'corpus path {args.corpuspath} does not exist' - assert args.unknown is None or os.path.exists(args.unknown), '"unknown file" does not exist' + helpers.check_author(args) + helpers.check_feat_sel_range(args) + helpers.check_class_weight(args) + helpers.check_corpus_path(args) + helpers.check_learner(args) + helpers.check_log_unknown(args) main() diff --git a/src/data/features.py b/src/data/features.py index f1bef2f..c16af31 100755 --- a/src/data/features.py +++ b/src/data/features.py @@ -372,8 +372,8 @@ class FeatureExtractor: def fit_transform(self, positives, negatives): documents = positives + negatives authors = [1]*len(positives) + [0]*len(negatives) - n_original_docs = len(documents) - groups = list(range(n_original_docs)) + self.n_original_docs = len(documents) + groups = list(range(self.n_original_docs)) if self.split_documents: doc_fragments, authors_fragments, groups_fragments = splitter( @@ -383,7 +383,7 @@ class FeatureExtractor: authors.extend(authors_fragments) groups.extend(groups_fragments) self._print(f'splitting documents: {len(doc_fragments)} segments + ' - f'{n_original_docs} documents = ' + f'{self.n_original_docs} documents = ' f'{len(documents)} total') # represent the target vector @@ -398,10 +398,11 @@ class FeatureExtractor: f'features_Mendenhall={self.features_Mendenhall} tfidf={self.wngrams} ' f'split_documents={self.split_documents}, split_policy={self.split_policy.__name__}' ) - print(f'number of training (full) documents: {n_original_docs}') + print(f'number of training (full) documents: {self.n_original_docs}') print(f'y prevalence: {y.sum()}/{len(y)} {y.mean() * 100:.2f}%') print() + self.fragments_range = slice(self.n_original_docs, len(y)) return X, y, groups def transform(self, test, return_fragments=False, window_size=-1, avoid_splitting=False): @@ -520,11 +521,10 @@ class FeatureExtractor: else: X = self._addfeatures(_tocsr(X), out['features'], taskname, out['f_names'] if fit else None) if fit: - vectorizer, selector = out['vectorizer'], out['selector'] - if taskname == '_wngrams_task' and self.wngrams_vectorizer is None: - self.wngrams_vectorizer, self.wngrams_selector = vectorizer, selector - elif taskname == '_cngrams_task' and self.cngrams_vectorizer is None: - self.cngrams_vectorizer, self.cngrams_selector = vectorizer, selector + if taskname == '_wngrams_task': + self.wngrams_vectorizer, self.wngrams_selector = out['vectorizer'], out['selector'] + elif taskname == '_cngrams_task': + self.cngrams_vectorizer, self.cngrams_selector = out['vectorizer'], out['selector'] if fit: self.feature_names = np.asarray(self.feature_names) diff --git a/src/helpers.py b/src/helpers.py new file mode 100644 index 0000000..9ce7dd0 --- /dev/null +++ b/src/helpers.py @@ -0,0 +1,62 @@ +import settings +from data.dante_loader import list_authors +import os +import pathlib + + +def tee(msg, log): + print(msg) + log.write(f'{msg}\n') + log.flush() + + +def check_author(args): + if args.positive == 'ALL': + args.authors = list_authors(args.corpuspath, skip_prefix='Epistola') + else: + if (args.positive not in settings.AUTHORS_CORPUS_I) and (args.positive in settings.AUTHORS_CORPUS_II): + print(f'warning: author {args.positive} is not in the known list of authors for CORPUS I nor CORPUS II') + assert args.positive in list_authors(args.corpuspath, skip_prefix='Epistola'), 'unexpected author' + args.authors = [args.positive] + + +def check_feat_sel_range(args): + if not isinstance(args.featsel, float): + if isinstance(args.featsel, str) and '.' in args.featsel: + args.featsel = float(args.featsel) + else: + args.featsel = int(args.featsel) + if isinstance(args.featsel, float): + assert 0 < args.featsel <= 1, 'feature selection ratio out of range' + + +def check_class_weight(args): + assert args.class_weight in ['balanced', 'none', 'None'] + if args.class_weight.lower() == 'none': + args.class_weight = None + + +def check_corpus_path(args): + assert os.path.exists(args.corpuspath), f'corpus path {args.corpuspath} does not exist' + args.corpus_name = pathlib.Path(args.corpuspath).name + + +def check_learner(args): + assert args.learner.lower() in settings.param_grid.keys(), \ + f'unknown learner, use any in {settings.param_grid.keys()}' + + +def check_log_loo(args): + if args.log is None: + os.makedirs('../results', exist_ok=True) + args.log = f'../results/LOO_Corpus{args.corpus_name}.Author{args.positive}.' \ + f'fs{args.featsel}.classweight{str(args.class_weight)}.CLS{args.learner}.txt' + + +def check_log_unknown(args): + if args.log is None: + os.makedirs('../results', exist_ok=True) + assert os.path.exists(args.unknown), f'file {args.unknown} does not exist' + args.unknown_name = pathlib.Path(args.unknown).name + args.log = f'../results/Unknown{args.unknown_name}_Corpus{args.corpus_name}.Author{args.positive}.' \ + f'fs{args.featsel}.classweight{str(args.class_weight)}.CLS{args.learner}.txt' \ No newline at end of file diff --git a/src/model.py b/src/model.py index e768fe1..2f0a125 100755 --- a/src/model.py +++ b/src/model.py @@ -1,117 +1,152 @@ from sklearn.base import BaseEstimator, TransformerMixin from sklearn.metrics import make_scorer -from sklearn.model_selection import GridSearchCV, LeaveOneOut, LeaveOneGroupOut, cross_val_score, StratifiedKFold +from sklearn.model_selection import GridSearchCV, GroupKFold from sklearn.linear_model import LogisticRegression +from sklearn.naive_bayes import MultinomialNB +from sklearn.svm import LinearSVC from data.features import * -from util.evaluation import f1, get_counters +from util.evaluation import f1_metric +from typing import List, Union + class AuthorshipVerificator(BaseEstimator): - def __init__(self, - nfolds=10, - param_grid={'C': np.logspace(-4, +3, 8)}, - C=1., - author_name=None): + def __init__(self, nfolds=10, param_grid=None, learner=None, C=1., alpha=0.001, class_weight='balanced', + random_seed=41, feat_selection_slices=None, feat_selection_ratio=1): self.nfolds = nfolds self.param_grid = param_grid + self.learner = learner self.C = C - self.author_name = author_name + self.alpha = alpha + self.class_weight = class_weight + self.random_seed = random_seed + self.feat_selection_slices = feat_selection_slices + self.feat_selection_ratio = feat_selection_ratio + + def fit(self, X, y, groups=None, hyperparam_optimization=True): + if self.param_grid is None and hyperparam_optimization: + raise ValueError('Param grid is None, but hyperparameter optimization is requested') + + if self.feat_selection_slices is not None: + self.fs = MultiRangeFeatureSelector(self.feat_selection_slices, feat_sel=self.feat_selection_ratio) + X = self.fs.fit(X, y).transform(X) + + if self.learner == 'lr': + self.classifier = LogisticRegression( + C=self.C, class_weight=self.class_weight, max_iter=1000, random_state=self.random_seed, solver='lbfgs' + ) + elif self.learner == 'svm': + self.classifier = LinearSVC(C=self.C, class_weight=self.class_weight) + elif self.learner == 'mnb': + self.classifier = MultinomialNB(alpha=self.alpha) - def fit(self, X, y): - self.classifier = LogisticRegression(C=self.C, class_weight='balanced', solver='lbfgs', max_iter=1000) y = np.asarray(y) positive_examples = y.sum() - if positive_examples >= self.nfolds and self.param_grid is not None: - print('optimizing {}'.format(self.classifier.__class__.__name__)) - folds = list(StratifiedKFold(n_splits=self.nfolds, shuffle=True, random_state=42).split(X, y)) + + if groups is None: + groups = np.arange(len(y)) + + if hyperparam_optimization and (positive_examples >= self.nfolds) and (len(np.unique(groups[y==1])) > 1): + folds = list(GroupKFold(n_splits=self.nfolds).split(X, y, groups)) self.estimator = GridSearchCV( - self.classifier, param_grid=self.param_grid, cv=folds, scoring=make_scorer(f1), n_jobs=-1 + self.classifier, param_grid=self.param_grid, cv=folds, scoring=make_scorer(f1_metric), n_jobs=-1, + refit=True, error_score=0 ) else: + # insufficient positive examples or document groups for grid-search; using default classifier + print('insufficient positive examples or document groups for grid-search; using default classifier') self.estimator = self.classifier self.estimator.fit(X, y) if isinstance(self.estimator, GridSearchCV): f1_mean = self.estimator.best_score_.mean() - print(f'Best params: {self.estimator.best_params_} (cross-validation F1={f1_mean:.3f})') - self.estimator = self.estimator.best_estimator_ + self.choosen_params_ = self.estimator.best_params_ + print(f'Best params: {self.choosen_params_} (cross-validation F1={f1_mean:.3f})') + else: + self.choosen_params_ = {'C': self.C, 'alpha': self.alpha} return self - def predict_with_fragments(self, test): - pred = self.estimator.predict(test) - full_doc_prediction = pred[0] - if len(pred) > 1: - fragment_predictions = pred[1:] - print('fragments average {:.3f}, array={}'.format(fragment_predictions.mean(), fragment_predictions)) - return full_doc_prediction, fragment_predictions - return full_doc_prediction - def predict(self, test): + if self.feat_selection_slices is not None: + test = self.fs.transform(test) return self.estimator.predict(test) - def predict_proba_with_fragments(self, test): - assert hasattr(self, 'predict_proba'), 'the classifier is not calibrated' - pred = self.estimator.predict_proba(test) - full_doc_prediction = pred[0,1] - if len(pred) > 1: - fragment_predictions = pred[1:,1] - print('fragments average {:.3f}, array={}'.format(fragment_predictions.mean(), fragment_predictions)) - return full_doc_prediction, fragment_predictions - return full_doc_prediction, [] - def predict_proba(self, test): assert hasattr(self, 'predict_proba'), 'the classifier is not calibrated' - return self.estimator.predict_proba(test) - - -def leave_one_out(model, X, y, files, groups=None, test_lowest_index_only=True, counters=False): - if groups is None: - print(f'Computing LOO without groups over {X.shape[0]} documents') - folds = list(LeaveOneOut().split(X, y)) - else: - print(f'Computing LOO with groups over {X.shape[0]} documents') - logo = LeaveOneGroupOut() - folds = list(logo.split(X, y, groups)) - if test_lowest_index_only: - print('ignoring fragments') - folds = [(train, np.min(test, keepdims=True)) for train, test in folds] - - print(f'optimizing via grid search each o the {len(folds)} prediction problems') - scores = cross_val_score(model, X, y, cv=folds, scoring=make_scorer(f1), n_jobs=-1, verbose=10) - missclassified = files[scores == 0].tolist() - #if hasattr(self.estimator, 'predict_proba') and len(missclassified) > 0: - # missclassified_prob = self.estimator.predict_proba(csr_matrix(X)[scores == 0])[:, 1] - # missclassified_prob = missclassified_prob.flatten().tolist() - # missclassified = [f'{file} Pr={prob:.3f}' for file, prob in zip(missclassified,missclassified_prob)] - print('missclassified texts:') - print('\n'.join(missclassified)) - - if counters and test_lowest_index_only: - yfull_true = y[:len(folds)] - yfull_predict = np.zeros_like(yfull_true) - yfull_predict[scores == 1] = yfull_true[scores == 1] - yfull_predict[scores != 1] = 1-yfull_true[scores != 1] - tp, fp, fn, tn = get_counters(yfull_true, yfull_predict) - return scores.mean(), scores.std(), tp, fp, fn, tn - else: - return scores.mean(), scores.std() + if self.feat_selection_slices is not None: + test = self.fs.transform(test) + prob = self.estimator.predict_proba(test) + return prob class RangeFeatureSelector(BaseEstimator, TransformerMixin): - def __init__(self, range: slice, feat_sel_ratio: float): + + def __init__(self, range: slice, feat_sel: Union[float, int]): self.range = range - self.feat_sel_ratio = feat_sel_ratio + self.feat_sel = feat_sel def fit(self, X, y): nF = self.range.stop-self.range.start - num_feats = int(self.feat_sel_ratio * nF) + if isinstance(self.feat_sel, int) and self.feat_sel>0: + num_feats = self.feat_sel + elif isinstance(self.feat_sel, float) and 0. <= self.feat_sel <= 1.: + num_feats = int(self.feat_sel * nF) + else: + raise ValueError('feat_sel should be a positive integer or a float in [0,1]') self.selector = SelectKBest(chi2, k=num_feats) self.selector.fit(X[:,self.range], y) return self def transform(self, X): Z = self.selector.transform(X[:,self.range]) - return csr_matrix(hstack([X[:,:self.range.start], Z, X[:,self.range.stop:]])) + normalize(Z, norm='l2', copy=False) + X = csr_matrix(hstack([X[:,:self.range.start], Z, X[:,self.range.stop:]])) + return X + + +class MultiRangeFeatureSelector(BaseEstimator, TransformerMixin): + def __init__(self, ranges: List[slice], feat_sel: Union[float,int]): + self.ranges = ranges + self.feat_sel = feat_sel + + def fit(self, X, y): + assert isinstance(self.ranges, list), 'ranges should be a list of slices' + self.__check_ranges_collisions(self.ranges) + self.ranges = self.__sort_ranges(self.ranges) + self.selectors = [RangeFeatureSelector(r, self.feat_sel).fit(X, y) for r in self.ranges] + return self + + def transform(self, X): + for selector in self.selectors: + X = selector.transform(X) + return X + + def __check_ranges_collisions(self, ranges: List[slice]): + for i,range_i in enumerate(ranges): + for j,range_j in enumerate(ranges): + if i==j: continue + if range_i.start <= range_j.start <= range_i.stop: return False + if range_i.start <= range_j.stop <= range_i.stop: return False + return True + + def __sort_ranges(self, ranges: List[slice]): + return np.asarray(ranges)[np.argsort([r.start for r in ranges])[::-1]] + + +def get_valid_folds(nfolds, X, y, groups, max_trials=100): + trials = 0 + folds = list(GroupKFold(n_splits=nfolds).split(X, y, groups)) + n_docs = len(y) + print(f'different classes={np.unique(y)}; #different documents={len(np.unique(groups))} positives={len(np.unique(groups[y==1]))}') + while any(len(np.unique(y[train])) < 2 for train, test in folds): + shuffle_index = np.random.permutation(n_docs) + X, y, groups = X[shuffle_index], y[shuffle_index], groups[shuffle_index] + folds = list(GroupKFold(n_splits=nfolds).split(X, y, groups)) + print(f'\ttrial{trials}:{[len(np.unique(y[train])) for train, test in folds]}') + trials+=1 + if trials>max_trials: + raise ValueError(f'could not meet condition after {max_trials} trials') + return folds diff --git a/src/settings.py b/src/settings.py new file mode 100644 index 0000000..3ca4fc4 --- /dev/null +++ b/src/settings.py @@ -0,0 +1,67 @@ +import numpy as np +from data.features import split_by_sentences + +AUTHORS_CORPUS_I = [ + 'Dante', + 'ClaraAssisiensis', + 'GiovanniBoccaccio', + 'GuidoFaba', + 'PierDellaVigna' +] + +AUTHORS_CORPUS_II = [ + 'Dante', + 'BeneFlorentinus', + 'BenvenutoDaImola', + 'BoncompagnoDaSigna', + 'ClaraAssisiensis', + 'FilippoVillani', + 'GiovanniBoccaccio', + 'GiovanniDelVirgilio', + 'GrazioloBambaglioli', + 'GuidoDaPisa', + 'GuidoDeColumnis', + 'GuidoFaba', + 'IacobusDeVaragine', + 'IohannesDeAppia', + 'IohannesDePlanoCarpini', + 'IulianusDeSpira', + 'NicolaTrevet', + 'PierDellaVigna', + 'PietroAlighieri', + 'RaimundusLullus', + 'RyccardusDeSanctoGermano', + 'ZonoDeMagnalis' +] + +DEFAULT_C = 0.1 +DEFAULT_ALPHA = 0.001 +CLASS_WEIGHT = 'balanced' +SEED = 1 + +grid_C = np.logspace(-3,3,7) +param_grid = { + 'lr': {'C': grid_C}, + 'svm': {'C': grid_C}, + 'mnb': {'alpha': np.logspace(-7,-1,7)} +} + +config_loo = { + 'function_words_freq': 'latin', + 'conjugations_freq': 'latin', + 'features_Mendenhall': True, + 'features_sentenceLengths': True, + 'feature_selection_ratio': 1, + 'wordngrams': True, + 'n_wordngrams': (1, 2), + 'charngrams': True, + 'n_charngrams': (3, 4, 5), + 'preserve_punctuation': False, + 'split_documents': True, + 'split_policy': split_by_sentences, + 'window_size': 3, + 'normalize_features': True +} + +config_unk = config_loo.copy() +config_unk['feature_selection_ratio']=0.1 \ No newline at end of file diff --git a/src/util/evaluation.py b/src/util/evaluation.py index 8e0d1e0..8c79bd1 100644 --- a/src/util/evaluation.py +++ b/src/util/evaluation.py @@ -1,4 +1,8 @@ import numpy as np +from scipy.sparse import csr_matrix +from sklearn.model_selection import LeaveOneGroupOut +from tqdm import tqdm +from joblib import Parallel, delayed def get_counters(true_labels, predicted_labels): @@ -19,6 +23,42 @@ def f1_from_counters(tp, fp, fn, tn): return 1.0 -def f1(true_labels, predicted_labels): +def f1_metric(true_labels, predicted_labels): tp, fp, fn, tn = get_counters(true_labels,predicted_labels) - return f1_from_counters(tp, fp, fn, tn) \ No newline at end of file + return f1_from_counters(tp, fp, fn, tn) + + +def leave_one_out(model, X, y, files, groups): + print(f'Computing LOO with groups over {X.shape[0]} documents') + logo = LeaveOneGroupOut() + + # Fragments are ignored in the test; only full documents are evaluated. + # The index of the full document is the lowest index + folds = [(train, np.min(test, keepdims=True)) for train, test in logo.split(X, y, groups)] + + def _classify_held_out(train, test, X, y, model): + X = csr_matrix(X) + # hyperparam_optim = (len(np.unique(groups[y[train] == 1])) > 2) + model.fit(X[train], y[train], groups[train])#, hyperparam_optim=hyperparam_optim) + y_pred = model.predict(X[test]).item() + score = (y_pred == y[test]).item() + return y_pred, score + + predictions_scores = Parallel(n_jobs=-1)( + delayed(_classify_held_out)(train, test, X, y, model) for train, test in folds + #tqdm( + # folds, desc=f'optimizing via grid search each of the {len(folds)} prediction problems' + #) + ) + predictions = np.asarray([p for p,s in predictions_scores]) + scores = np.asarray([s for p, s in predictions_scores]) + print(predictions, scores) + + missclassified = files[scores == 0].tolist() + + yfull_true = y[:len(folds)] + tp, fp, fn, tn = get_counters(yfull_true, predictions) + f1 = f1_from_counters(tp, fp, fn, tn) + acc = scores.mean() + + return acc, f1, tp, fp, fn, tn, missclassified \ No newline at end of file