dataset/annotation.py

363 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# --- Import librerie ---
import pandas as pd
from openai import AzureOpenAI
import pickle
from sentence_transformers import SentenceTransformer
import numpy as np
import faiss
import openpyxl
import re
import json
from openpyxl.styles import PatternFill
from openpyxl import load_workbook
from collections import Counter
from prompts.prompt import build_prompt_local
import warnings
import logging
# --- Configurazione ---
endpoint = "https://gpt-sw-central-tap-security.openai.azure.com/"
deployment = "gpt-4o"
subscription_key = "8zufUIPs0Dijh0M6NpifkkDvxJHZMFtott7u8V8ySTYNcpYVoRbsJQQJ99BBACfhMk5XJ3w3AAABACOGr6sq"
client = AzureOpenAI(
azure_endpoint=endpoint,
api_key=subscription_key,
api_version="2024-05-01-preview",
)
# ----- Step 1: caricare datasets -----
df_labeled = pd.read_csv("main/datasets/annotated_dataset.csv", encoding="cp1252", sep=';') # colonne: automation, description, category, subcategory, problem_type, gravity
df_unlabeled = pd.read_csv("main/datasets/unlabeled_dataset.csv", sep='\t', encoding='utf-8')
print("***STEP 1***\nDataset etichettato caricato. Numero righe:", len(df_labeled), "\nDataset non etichettato caricato. Numero righe:", len(df_unlabeled))
def clean_id(x):
if pd.isna(x):
return ""
s = str(x)
m = re.search(r"\d+", s) # prima sequenza di cifre
return m.group(0) if m else s.strip()
df_labeled["automation_id"] = df_labeled["automation_id"].apply(clean_id)
df_unlabeled["automation_id"] = df_unlabeled["automation_id"].apply(clean_id)
df_labeled["folder"] = df_labeled["folder"].astype(str).str.strip()
df_unlabeled["folder"] = df_unlabeled["folder"].astype(str).str.strip()
labeled_pairs = set(zip(df_labeled["automation_id"], df_labeled["folder"])) # Crea set di coppie già etichettate
df_unlabeled_filtered = df_unlabeled[
~df_unlabeled.apply(lambda row: (row["automation_id"], row["folder"]) in labeled_pairs, axis=1) # Filtra il dataset non etichettato
]
print("Automazioni non etichettate rimanenti dopo la pulizia:", len(df_unlabeled_filtered))
# --- Step 2: embeddings ---
# Silenzia warning generici
warnings.filterwarnings("ignore")
# Silenzia logging di transformers / sentence-transformers / HF hub
logging.getLogger("sentence_transformers").setLevel(logging.ERROR)
logging.getLogger("transformers").setLevel(logging.ERROR)
logging.getLogger("huggingface_hub").setLevel(logging.ERROR)
print("\n***Step 2 ***\nEmbeddings")
model = SentenceTransformer("all-MiniLM-L6-v2")
with open("main/labeled_embeddings.pkl", "rb") as f:
data = pickle.load(f)
embeddings = data['embeddings'].astype("float32")
print("Shape embeddings:", embeddings.shape)
# ----- Step3: Creazione indice FAISS ---
dimension = embeddings.shape[1]
index = faiss.IndexFlatL2(dimension) # indice L2 (distanza Euclidea)
index.add(embeddings)
print(f"\n***Step 3: Indice FAISS creato***. \nNumero di vettori nell'indice: {index.ntotal}")
# ----- Step4: Retrieval (similarità) ---
# Prova con le prime 500 automazioni non annotate
k = 5
output_rows = []
df_sample = df_unlabeled_filtered.head(500)
llm_rows = []
def sim_label(distance: float) -> str:
if distance <= 0.50:
return "Match forte"
elif distance <= 0.75:
return "Match plausibile"
elif distance <= 0.90:
return "Similarità instabile"
else:
return "Troppo distante"
for i, row in df_sample.iterrows():
query_text = str(row["human_like"])
print("numero corrente:", i)
# Calcolo embedding della nuova automazione
query_emb = model.encode([query_text], convert_to_numpy=True).astype("float32")
# Recupera indici dei k vicini più prossimi
distances, indices = index.search(query_emb, k)
# Metriche globali sui top-k (una volta per automazione)
topk_cats = []
top1_distance = float(distances[0][0])
top1_confidence = 1 / (1 + top1_distance)
top1_similarity_label = sim_label(top1_distance)
for rank in range(k):
idx = int(indices[0][rank])
distance = float(distances[0][rank])
confidence = 1 / (1 + distance)
label = sim_label(distance)
retrieved_row = df_labeled.iloc[idx]
topk_cats.append(str(retrieved_row["category"]))
rank1_category = topk_cats[0] if topk_cats else ""
majority_category = Counter(topk_cats).most_common(1)[0][0] if topk_cats else ""
consistency = (sum(c == majority_category for c in topk_cats) / len(topk_cats)) if topk_cats else 0.0
for rank in range(k):
idx = int(indices[0][rank])
distance = float(distances[0][rank])
confidence = 1 / (1 + distance)
label = sim_label(distance)
retrieved_row = df_labeled.iloc[idx]
output_rows.append({
# query
"automazione da etichettare": query_text,
# info retrieval per questa riga
"rank": rank + 1,
"retrieved_idx": idx,
"automazione simile": retrieved_row["automation"],
"categoria automazione simile": retrieved_row["category"],
"distanza": distance,
"confidence": round(confidence, 4),
"similarity": label,
# metriche aggregate top-k (ripetute su ogni riga)
"rank1_distance": top1_distance,
"rank1_confidence": round(top1_confidence, 4),
"rank1_similarity_label": top1_similarity_label,
"rank1_category": rank1_category,
"majority_category": majority_category,
"consistency": round(consistency, 3),
"top5_categories": " | ".join(topk_cats)
})
# --- Step5: invio dati al LLM ---
# (1) Costruzione prompt
retrieved = df_labeled.iloc[indices[0]].copy()
retrieved["distance"] = distances[0].astype(float)
retrieved["confidence"] = retrieved["distance"].apply(lambda d: 1 / (1 + float(d)))
retrieved["similarity"] = retrieved["distance"].apply(sim_label)
prompt = build_prompt_local(query_text, retrieved, sim_label)
# (2) Chiamata al modello: restituisce JSON
resp = client.chat.completions.create(
model=deployment,
messages=[
{"role": "system", "content": "Return ONLY valid JSON. No extra text."},
{"role": "user", "content": prompt},
],
temperature=0
)
content = resp.choices[0].message.content.strip()
# (3) Parsing della risposta
try:
parsed = json.loads(content)
except Exception:
parsed = {
"automation": query_text,
"category": "",
"subcategory": "",
"problem_type": "",
"gravity": "",
"scores": {},
"needs_human_review": True,
"short_rationale": f"JSON_PARSE_ERROR: {content[:200]}"
}
# (4) Salvataggio di 1 riga per automazione con:
# - metriche retrieval (rank1/majority/consistency)
# - output dell'LLM (scores + label finale + review flag)
llm_category = parsed.get("category", "")
llm_subcategory = parsed.get("subcategory", "")
llm_problem_type = parsed.get("problem_type", "")
llm_gravity = parsed.get("gravity", "")
final_category = llm_category
final_subcategory = llm_subcategory
final_problem_type = llm_problem_type
final_gravity = llm_gravity
if llm_category.strip().upper() == "HARMLESS":
llm_subcategory = ""
llm_problem_type = "NONE"
llm_gravity = "NONE"
# ================= HUMAN REVIEW LOGIC =================
needs_human_review = bool(parsed.get("needs_human_review", True))
OVERRIDE_MAX_DISTANCE = 0.90
OVERRIDE_MIN_CONSISTENCY = 0.60
# Allineamento forte: LLM = majority = top1
aligned_strong = (
llm_category == majority_category and
llm_category == rank1_category and
llm_category != ""
)
# distanza non eccessiva e buona consistency
good_retrieval = (
top1_distance <= OVERRIDE_MAX_DISTANCE and
consistency >= OVERRIDE_MIN_CONSISTENCY
)
# allora NON richiede revisione anche se il modello aveva messo True
if aligned_strong and good_retrieval:
needs_human_review = False
# =====================================================
llm_rows.append({
"automation_id": row.get("automation_id", ""),
"folder": row.get("folder", ""),
"automation_text": query_text,
"rank1_distance": top1_distance,
"rank1_confidence": round(top1_confidence, 4),
"rank1_similarity_label": top1_similarity_label,
"rank1_category": rank1_category,
"majority_category": majority_category,
"consistency": round(consistency, 3),
"top5_categories": " | ".join(topk_cats),
"llm_category": llm_category,
"llm_subcategory": llm_subcategory,
"llm_problem_type": llm_problem_type,
"llm_gravity": llm_gravity,
"llm_needs_human_review": parsed.get("needs_human_review", True),
"final_needs_human_review": needs_human_review,
"final_category": final_category,
"final_subcategory": final_subcategory,
"final_problem_type": final_problem_type,
"final_gravity": final_gravity,
"llm_rationale": parsed.get("short_rationale", "")
})
# --- Step6: integrazione e output ---
# (5) Esportare loutput finale come dataframe
df_llm = pd.DataFrame(llm_rows)
out_path = "main/datasets/labeling_first500.xlsx"
df_llm.to_excel(out_path, index=False)
wb = load_workbook(out_path)
ws = wb.active
# Colori per needs_human_review
true_fill = PatternFill(start_color="FF6347", end_color="FF6347", fill_type="solid") # rosso
false_fill = PatternFill(start_color="90EE90", end_color="90EE90", fill_type="solid") # verde
col_index = {cell.value: idx for idx, cell in enumerate(ws[1], start=1)}
if "llm_needs_human_review" in col_index:
c = col_index["llm_needs_human_review"]
for r in range(2, ws.max_row + 1):
val = ws.cell(row=r, column=c).value
if val is True:
ws.cell(row=r, column=c).fill = true_fill
elif val is False:
ws.cell(row=r, column=c).fill = false_fill
if "final_needs_human_review" in col_index:
c = col_index["final_needs_human_review"]
for r in range(2, ws.max_row + 1):
val = ws.cell(row=r, column=c).value
if val is True:
ws.cell(row=r, column=c).fill = true_fill
elif val is False:
ws.cell(row=r, column=c).fill = false_fill
wb.save(out_path)
print(f"\n***Step 6: Retrieval e LLM ***\nExcel LLM salvato in {out_path}")
# --- Conteggio needs_human_review ---
review_counts = df_llm["final_needs_human_review"].value_counts(dropna=False)
true_count = review_counts.get(True, 0)
false_count = review_counts.get(False, 0)
print("\n--- Needs human review summary ---")
print(f"needs_human_review = True : {true_count}")
print(f"needs_human_review = False: {false_count}")
# --- Step7: dataset finale su tutte le automazioni (solo testo + etichette) ---
df_final = df_llm[[
"automation_text",
"llm_category",
"llm_subcategory",
"llm_gravity",
"llm_problem_type",
"final_needs_human_review"
]].rename(columns={
"llm_category": "category",
"llm_subcategory": "subcategory",
"llm_gravity": "gravity",
"llm_problem_type": "problem_type"
})
# Normalizza stringhe
for col in ["category", "subcategory", "gravity", "problem_type"]:
df_final[col] = df_final[col].fillna("").astype(str).str.strip()
# Creazione DataFrame risultati
# df_results = pd.DataFrame(output_rows)
# output_path = "main/datasets/similarity_analysis.xlsx"
# df_results.to_excel(output_path, index=False)
#wb = load_workbook(output_path)
#ws = wb.active
#distanza_col_idx = None
#for idx, cell in enumerate(ws[1], start=1):
#if cell.value == "distanza":
#distanza_col_idx = idx
#break
#if distanza_col_idx is None:
#raise ValueError("Colonna 'distanza' non trovata!")
# Applichiamo i colori in base al valore
#for row in ws.iter_rows(min_row=2, max_row=ws.max_row, min_col=distanza_col_idx, max_col=distanza_col_idx):
#cell = row[0]
#try:
#val = float(cell.value)
#if val < 0.5:
#color = "90EE90" # verde chiaro
#elif val < 1.0:
#color = "FFFF00" # giallo
#else:
#color = "FF6347" # rosso
#cell.fill = PatternFill(start_color=color, end_color=color, fill_type="solid")
#except:
#continue
# Salva il file direttamente con colori applicati
#wb.save(output_path)
#print(f"Excel salvato in {output_path}")