import pandas as pd import spacy from spacy.util import minibatch, compounding import regex as re import pickle from pprint import pprint from common.utils import solve_overlap, check_overlap import warnings import jsonlines import json import random COMMENTARIES_PATH = './commentaries/' DF_COMMENTARIES_PATH = './commentaries/data_parsed/' df_commentary_monarchia = pd.read_csv(DF_COMMENTARIES_PATH + 'monarchia_DF.csv') df_ner_unique = pd.read_csv(DF_COMMENTARIES_PATH + 'ner_unique_monarchia.csv') """ df_ner_unique ATM contains terms found in "De Monarchia". The .csv file should contain all the occurrences of tagged terms across all of the (tagged) documents! """ class DataSetBuilder: SPACY_MODEL_STD = 'it_core_news_sm' def __init__(self, commentaries_tr, commentaries_eva, NER=None): self._commentaries_df = commentaries_tr self._commentaries_eva = commentaries_eva self.commentaries = None self.clean_commentaries = None self.commentaries_eva = None self.clean_commentaries_eva = None self.ner_lookup = None self.ner_clean_lookup = None self._NER = NER self.TRAIN_DATA = None self.get_commentaries() def get_commentaries(self): commentaries_tr = self._commentaries_df['comment'].unique() commentaries_ev = self._commentaries_eva['comment'].unique() clean_commentaries = [] for commentary in [commentaries_tr, commentaries_ev]: commentary_list = [] for comment in commentary: cleaned = comment.replace('', '') cleaned = cleaned.replace('', '') commentary_list.append(cleaned) clean_commentaries.append(commentary_list) self.commentaries = commentaries_tr self.clean_commentaries = clean_commentaries[0] self.commentaries_eva = commentaries_ev self.clean_commentaries_eva = clean_commentaries[1] return commentaries_tr, commentaries_ev, clean_commentaries def get_NER_lookup(self): df_ner = self._NER df_ner.dropna(inplace=True) """ TODO: WRT Monatchia.xml - dropping some conflicting matches (i,e., Summa, De regimine principum ad regem Cypri, Moralium, Memoriale de prerogativa Imperii Romani, Tractatus) """ conflicitng_matches = ['Summa', 'De regimine principum ad regem Cypri', 'Moralium', 'Memoriale de prerogativa Imperii Romani', 'Tractatus'] for conflict in conflicitng_matches: #print(f'Dropping: {conflict}') df_ner = df_ner.drop(df_ner[df_ner['match'] == conflict].index) NER_matches = df_ner['match'].values NER_types = df_ner['type'].values ner_lookup = {} for i, elem in enumerate(NER_matches): if elem not in ner_lookup: ner_lookup[elem] = NER_types[i] ner_clean_lookup = {} for i, elem in enumerate(NER_matches): _elem = elem.replace('', '') _elem = _elem.replace('', '') if _elem not in ner_clean_lookup: ner_clean_lookup[_elem] = NER_types[i] self.ner_lookup = ner_lookup self.ner_clean_lookup = ner_clean_lookup return ner_lookup, ner_clean_lookup def _annotate_commentaries(self): """ Get all the matches in original (i.e., with tagged keywords ...) in order to retrieve them later from the cleaned commentaries and avoid conflict with subword keys """ matches_in_commentaries = [] for comment in self.commentaries: res = [] for k in self.ner_lookup.keys(): matches = re.finditer(k, comment, re.MULTILINE) for i, match in enumerate(matches, start=1): res.append([match.start(), match.end(), match.group()]) matches_in_commentaries.append(res) matches_in_clean_commentaries = [] for i, match_list in enumerate(matches_in_commentaries): res = [] for k in match_list: key = k[2] clean_key = key.replace('', '') clean_key = clean_key.replace('', '') regex = f'\\b{clean_key}\\b' matches = re.finditer(regex, self.clean_commentaries[i], re.MULTILINE) for j, match in enumerate(matches, start=1): res.append([match.start(), match.end(), match.group()]) matches_in_clean_commentaries.append(res) return matches_in_commentaries, matches_in_clean_commentaries def build_train_data(self): from collections import OrderedDict matches_raw, matches_clean = self._annotate_commentaries() TRAIN_DATA = [] for i, comment in enumerate(self.clean_commentaries): text = comment ent_dict = {'entities' : []} for ent in matches_clean[i]: _temp = (ent[0], ent[1], self.ner_clean_lookup[ent[2]]) ent_dict['entities'].append(_temp) ent_dict['entities'] = list(OrderedDict.fromkeys(ent_dict['entities'])) TRAIN_DATA.append((text, ent_dict)) self.TRAIN_DATA = TRAIN_DATA return TRAIN_DATA def get_rehearsal_data(self): revision_data = [] print('# NB: TAGGING WITH standard spacy model!') nlp_std = spacy.load(self.SPACY_MODEL_STD) #nlp_std = spacy.load('./models/') for doc in nlp_std.pipe(self.clean_commentaries): entities = [(e.start_char, e.end_char, e.label_) for e in doc.ents] revision_data.append((doc, {'entities':entities})) self.revision_data = revision_data return revision_data def train_model(self): from toolz import partition_all # See Docs @ https://toolz.readthedocs.io/en/latest/ import random nlp_std = spacy.load(self.SPACY_MODEL_STD) #revision_data = self.get_rehearsal_data() #REHEARSAL_DATA = self.TRAIN_DATA[:300] + revision_data[:150] REHEARSAL_DATA = self.import_dataset_doccano('./commentaries/data_parsed/doccano_data/file.json1') print(f'Len TRAIN_DATA: {len(REHEARSAL_DATA)}') ner = nlp_std.get_pipe('ner') for _, annotations in self.TRAIN_DATA: for ent in annotations.get("entities"): ner.add_label(ent[2]) pipe_exceptions = ["ner", "trf_wordpiecer", "trf_tok2vec"] other_pipes = [pipe for pipe in nlp_std.pipe_names if pipe not in pipe_exceptions] optimizer = nlp_std.resume_training() with nlp_std.disable_pipes(*other_pipes) and warnings.catch_warnings(): warnings.filterwarnings("once", category=UserWarning, module='spacy') n_epochs = 10 #batch_size = 32 print(f'\n## Begin Training') for i in range(n_epochs): print(f'Iteration {i+1}') losses = {} random.shuffle(REHEARSAL_DATA) batches = minibatch(REHEARSAL_DATA, size=compounding(4.0, 32.0, 1.001)) for batch in batches: docs, golds = zip(*batch) nlp_std.update(docs, golds, sgd=optimizer, losses=losses) print(f'loss: {losses}') test_text = self.clean_commentaries[random.randint(1, len(self.commentaries))] def pprint_com(comment, l=100): i = 0 while len(comment) > i+100: j = i+l print(comment[i:j]) i += 100 print(comment[i:len(comment)]) print(f'SENTENCE:\n') pprint_com(test_text) doc = nlp_std(test_text) print('\nFINED-TUNED NER MODEL PREDICTIONS:') for ent in doc.ents: print(ent.text, ent.label_) print('-'*50) print('STANDARD NER MODEL PREDICTIONS:') nlp_reloaded = spacy.load('it_core_news_sm') doc_STD = nlp_reloaded(test_text) for ent in doc_STD.ents: print(ent.text, ent.label_) def export_dataset_doccano(self, outputfile_name): """ Doccano JSONL data format: {"text": "EU rejects German call to boycott British lamb.", "labels": [ [0, 2, "ORG"], [11, 17, "MISC"], ... ]} {"text": "Peter Blackburn", "labels": [ [0, 15, "PERSON"] ]} {"text": "President Obama", "labels": [ [10, 15, "PERSON"] ]} """ data = self.TRAIN_DATA revision_data = self.get_rehearsal_data() if data is not None: output = self.merge_rehearsed(data, revision_data) else: output = [{'text': doc[0].text, 'labels': doc[1]['entities']} for doc in revision_data] with jsonlines.open(f'./commentaries/data_parsed/doccano_data/{outputfile_name}.jsonl', mode='w') as writer: writer.write_all(output) def merge_rehearsed(self, data, revision_data): res = [] revision_data = revision_data for i, comment in enumerate(data): _tmp = comment[1]['entities'] + revision_data[i][1]['entities'] res.append({'text':comment[0], 'labels':_tmp}) labels_solved = [solve_overlap(l['labels']) for l in res] for l in labels_solved: check_overlap(l) # TODO: one "WORK OF ART" label got labelled as "\rnWORK_OF_ART" ??? for i, solved_ent in enumerate(labels_solved): res[i]['labels'] = solved_ent return res def import_dataset_doccano(self, path): data = [] with open(path) as infile: content = infile.read().splitlines() for line in content: json_data = json.loads(line) ent = {'entities': []} ent['entities'] = json_data['labels'] data.append((json_data['text'], ent)) self.TRAIN_DATA = data return data # if __name__ == '__main__': # data = DataSetBuilder(df_commentary, df_ner_unique) # # ner_lookup, ner_clean_lookup = data.get_NER_lookup() # data.get_commentaries() # data.build_train_data() # data.train_model() # data.export_dataset_doccano() # data.import_dataset_doccano('./commentaries/data_parsed/doccano_data/file.json1') # data.train_model()