285 lines
10 KiB
Python
285 lines
10 KiB
Python
import pandas as pd
|
|
import spacy
|
|
from spacy.util import minibatch, compounding
|
|
import regex as re
|
|
import pickle
|
|
from pprint import pprint
|
|
from common.utils import solve_overlap, check_overlap
|
|
import warnings
|
|
import jsonlines
|
|
import json
|
|
import random
|
|
|
|
COMMENTARIES_PATH = './commentaries/'
|
|
DF_COMMENTARIES_PATH = './commentaries/data_parsed/'
|
|
df_commentary_monarchia = pd.read_csv(DF_COMMENTARIES_PATH + 'monarchia_DF.csv')
|
|
df_ner_unique = pd.read_csv(DF_COMMENTARIES_PATH + 'ner_unique_monarchia.csv')
|
|
"""
|
|
df_ner_unique ATM contains <i>terms</i> found in "De Monarchia". The .csv file should
|
|
contain all the occurrences of tagged terms across all of the (tagged) documents!
|
|
"""
|
|
|
|
|
|
class DataSetBuilder:
|
|
SPACY_MODEL_STD = 'it_core_news_sm'
|
|
|
|
def __init__(self, commentaries_tr, commentaries_eva, NER=None):
|
|
self._commentaries_df = commentaries_tr
|
|
self._commentaries_eva = commentaries_eva
|
|
self.commentaries = None
|
|
self.clean_commentaries = None
|
|
self.commentaries_eva = None
|
|
self.clean_commentaries_eva = None
|
|
self.ner_lookup = None
|
|
self.ner_clean_lookup = None
|
|
self._NER = NER
|
|
self.TRAIN_DATA = None
|
|
self.get_commentaries()
|
|
|
|
def get_commentaries(self):
|
|
commentaries_tr = self._commentaries_df['comment'].unique()
|
|
commentaries_ev = self._commentaries_eva['comment'].unique()
|
|
clean_commentaries = []
|
|
for commentary in [commentaries_tr, commentaries_ev]:
|
|
commentary_list = []
|
|
for comment in commentary:
|
|
cleaned = comment.replace('<i>', '')
|
|
cleaned = cleaned.replace('</i>', '')
|
|
commentary_list.append(cleaned)
|
|
clean_commentaries.append(commentary_list)
|
|
|
|
self.commentaries = commentaries_tr
|
|
self.clean_commentaries = clean_commentaries[0]
|
|
self.commentaries_eva = commentaries_ev
|
|
self.clean_commentaries_eva = clean_commentaries[1]
|
|
return commentaries_tr, commentaries_ev, clean_commentaries
|
|
|
|
def get_NER_lookup(self):
|
|
df_ner = self._NER
|
|
df_ner.dropna(inplace=True)
|
|
|
|
"""
|
|
TODO: WRT Monatchia.xml - dropping some conflicting matches (i,e., Summa,
|
|
De regimine principum ad regem Cypri, Moralium,
|
|
Memoriale de prerogativa Imperii Romani, Tractatus)
|
|
"""
|
|
|
|
conflicitng_matches = ['<i>Summa</i>', '<i>De regimine principum ad regem Cypri</i>',
|
|
'<i>Moralium</i>',
|
|
'<i>Memoriale de prerogativa Imperii Romani</i>',
|
|
'<i>Tractatus</i>']
|
|
for conflict in conflicitng_matches:
|
|
#print(f'Dropping: {conflict}')
|
|
df_ner = df_ner.drop(df_ner[df_ner['match'] == conflict].index)
|
|
|
|
NER_matches = df_ner['match'].values
|
|
NER_types = df_ner['type'].values
|
|
|
|
ner_lookup = {}
|
|
for i, elem in enumerate(NER_matches):
|
|
if elem not in ner_lookup:
|
|
ner_lookup[elem] = NER_types[i]
|
|
|
|
ner_clean_lookup = {}
|
|
for i, elem in enumerate(NER_matches):
|
|
_elem = elem.replace('<i>', '')
|
|
_elem = _elem.replace('</i>', '')
|
|
if _elem not in ner_clean_lookup:
|
|
ner_clean_lookup[_elem] = NER_types[i]
|
|
|
|
self.ner_lookup = ner_lookup
|
|
self.ner_clean_lookup = ner_clean_lookup
|
|
|
|
return ner_lookup, ner_clean_lookup
|
|
|
|
|
|
def _annotate_commentaries(self):
|
|
"""
|
|
Get all the matches in original (i.e., with tagged keywords <i>...</i>)
|
|
in order to retrieve them later from the cleaned commentaries and avoid
|
|
conflict with subword keys
|
|
"""
|
|
matches_in_commentaries = []
|
|
for comment in self.commentaries:
|
|
res = []
|
|
for k in self.ner_lookup.keys():
|
|
matches = re.finditer(k, comment, re.MULTILINE)
|
|
for i, match in enumerate(matches, start=1):
|
|
res.append([match.start(), match.end(), match.group()])
|
|
matches_in_commentaries.append(res)
|
|
|
|
matches_in_clean_commentaries = []
|
|
|
|
for i, match_list in enumerate(matches_in_commentaries):
|
|
res = []
|
|
for k in match_list:
|
|
key = k[2]
|
|
clean_key = key.replace('<i>', '')
|
|
clean_key = clean_key.replace('</i>', '')
|
|
regex = f'\\b{clean_key}\\b'
|
|
matches = re.finditer(regex, self.clean_commentaries[i], re.MULTILINE)
|
|
for j, match in enumerate(matches, start=1):
|
|
res.append([match.start(), match.end(), match.group()])
|
|
matches_in_clean_commentaries.append(res)
|
|
|
|
return matches_in_commentaries, matches_in_clean_commentaries
|
|
|
|
|
|
def build_train_data(self):
|
|
from collections import OrderedDict
|
|
matches_raw, matches_clean = self._annotate_commentaries()
|
|
TRAIN_DATA = []
|
|
for i, comment in enumerate(self.clean_commentaries):
|
|
text = comment
|
|
ent_dict = {'entities' : []}
|
|
for ent in matches_clean[i]:
|
|
_temp = (ent[0], ent[1], self.ner_clean_lookup[ent[2]])
|
|
ent_dict['entities'].append(_temp)
|
|
ent_dict['entities'] = list(OrderedDict.fromkeys(ent_dict['entities']))
|
|
TRAIN_DATA.append((text, ent_dict))
|
|
|
|
self.TRAIN_DATA = TRAIN_DATA
|
|
return TRAIN_DATA
|
|
|
|
|
|
def get_rehearsal_data(self):
|
|
revision_data = []
|
|
print('# NB: TAGGING WITH standard spacy model!')
|
|
nlp_std = spacy.load(self.SPACY_MODEL_STD)
|
|
#nlp_std = spacy.load('./models/')
|
|
|
|
for doc in nlp_std.pipe(self.clean_commentaries):
|
|
entities = [(e.start_char, e.end_char, e.label_) for e in doc.ents]
|
|
revision_data.append((doc, {'entities':entities}))
|
|
self.revision_data = revision_data
|
|
return revision_data
|
|
|
|
|
|
def train_model(self):
|
|
from toolz import partition_all # See Docs @ https://toolz.readthedocs.io/en/latest/
|
|
import random
|
|
|
|
nlp_std = spacy.load(self.SPACY_MODEL_STD)
|
|
|
|
#revision_data = self.get_rehearsal_data()
|
|
|
|
#REHEARSAL_DATA = self.TRAIN_DATA[:300] + revision_data[:150]
|
|
|
|
REHEARSAL_DATA = self.import_dataset_doccano('./commentaries/data_parsed/doccano_data/file.json1')
|
|
print(f'Len TRAIN_DATA: {len(REHEARSAL_DATA)}')
|
|
|
|
ner = nlp_std.get_pipe('ner')
|
|
for _, annotations in self.TRAIN_DATA:
|
|
for ent in annotations.get("entities"):
|
|
ner.add_label(ent[2])
|
|
|
|
pipe_exceptions = ["ner", "trf_wordpiecer", "trf_tok2vec"]
|
|
other_pipes = [pipe for pipe in nlp_std.pipe_names if pipe not in pipe_exceptions]
|
|
|
|
optimizer = nlp_std.resume_training()
|
|
|
|
with nlp_std.disable_pipes(*other_pipes) and warnings.catch_warnings():
|
|
warnings.filterwarnings("once", category=UserWarning, module='spacy')
|
|
|
|
n_epochs = 10
|
|
#batch_size = 32
|
|
print(f'\n## Begin Training')
|
|
for i in range(n_epochs):
|
|
print(f'Iteration {i+1}')
|
|
losses = {}
|
|
random.shuffle(REHEARSAL_DATA)
|
|
batches = minibatch(REHEARSAL_DATA, size=compounding(4.0, 32.0, 1.001))
|
|
for batch in batches:
|
|
#for batch in partition_all(batch_size, REHEARSAL_DATA):
|
|
docs, golds = zip(*batch)
|
|
#texts, annotations = zip(*batch)
|
|
nlp_std.update(docs, golds, sgd=optimizer, losses=losses)
|
|
print(f'loss: {losses}')
|
|
|
|
test_text = self.clean_commentaries[random.randint(1, len(self.commentaries))]
|
|
|
|
def pprint_com(comment, l=100):
|
|
i = 0
|
|
while len(comment) > i+100:
|
|
j = i+l
|
|
print(comment[i:j])
|
|
i += 100
|
|
print(comment[i:len(comment)])
|
|
|
|
print(f'SENTENCE:\n')
|
|
pprint_com(test_text)
|
|
doc = nlp_std(test_text)
|
|
print('\nFINED-TUNED NER MODEL PREDICTIONS:')
|
|
for ent in doc.ents:
|
|
print(ent.text, ent.label_)
|
|
|
|
print('-'*50)
|
|
print('STANDARD NER MODEL PREDICTIONS:')
|
|
nlp_reloaded = spacy.load('it_core_news_sm')
|
|
doc_STD = nlp_reloaded(test_text)
|
|
for ent in doc_STD.ents:
|
|
print(ent.text, ent.label_)
|
|
|
|
|
|
def export_dataset_doccano(self, outputfile_name):
|
|
"""
|
|
Doccano JSONL data format:
|
|
{"text": "EU rejects German call to boycott British lamb.", "labels": [ [0, 2, "ORG"], [11, 17, "MISC"], ... ]}
|
|
{"text": "Peter Blackburn", "labels": [ [0, 15, "PERSON"] ]}
|
|
{"text": "President Obama", "labels": [ [10, 15, "PERSON"] ]}
|
|
"""
|
|
data = self.TRAIN_DATA
|
|
revision_data = self.get_rehearsal_data()
|
|
if data is not None:
|
|
output = self.merge_rehearsed(data, revision_data)
|
|
else:
|
|
output = [{'text': doc[0].text, 'labels': doc[1]['entities']} for doc in revision_data]
|
|
|
|
with jsonlines.open(f'./commentaries/data_parsed/doccano_data/{outputfile_name}.jsonl', mode='w') as writer:
|
|
writer.write_all(output)
|
|
|
|
|
|
def merge_rehearsed(self, data, revision_data):
|
|
res = []
|
|
revision_data = revision_data
|
|
for i, comment in enumerate(data):
|
|
_tmp = comment[1]['entities'] + revision_data[i][1]['entities']
|
|
res.append({'text':comment[0], 'labels':_tmp})
|
|
|
|
labels_solved = [solve_overlap(l['labels']) for l in res]
|
|
|
|
for l in labels_solved:
|
|
check_overlap(l)
|
|
# TODO: one "WORK OF ART" label got labelled as "\rnWORK_OF_ART" ???
|
|
|
|
for i, solved_ent in enumerate(labels_solved):
|
|
res[i]['labels'] = solved_ent
|
|
|
|
return res
|
|
|
|
|
|
def import_dataset_doccano(self, path):
|
|
data = []
|
|
with open(path) as infile:
|
|
content = infile.read().splitlines()
|
|
|
|
for line in content:
|
|
json_data = json.loads(line)
|
|
ent = {'entities':[]}
|
|
ent['entities'] = json_data['labels']
|
|
data.append((json_data['text'], ent))
|
|
|
|
self.TRAIN_DATA = data
|
|
return data
|
|
|
|
if __name__ == '__main__':
|
|
data = DataSetBuilder(df_commentary, df_ner_unique)
|
|
|
|
ner_lookup, ner_clean_lookup = data.get_NER_lookup()
|
|
data.get_commentaries()
|
|
#data.build_train_data()
|
|
#data.train_model()
|
|
#data.export_dataset_doccano()
|
|
#data.import_dataset_doccano('./commentaries/data_parsed/doccano_data/file.json1')
|
|
#data.train_model()
|