316 lines
11 KiB
Python
316 lines
11 KiB
Python
# --- Import librerie ---
|
|
import pandas as pd
|
|
from openai import AzureOpenAI
|
|
import pickle
|
|
from sentence_transformers import SentenceTransformer
|
|
import numpy as np
|
|
import faiss
|
|
import openpyxl
|
|
import re
|
|
import json
|
|
from openpyxl.styles import PatternFill
|
|
from openpyxl import load_workbook
|
|
from collections import Counter
|
|
from prompts.prompt import build_prompt_local
|
|
import warnings
|
|
import logging
|
|
import unicodedata
|
|
|
|
# --- Configurazione ---
|
|
endpoint = "https://gpt-sw-central-tap-security.openai.azure.com/"
|
|
deployment = "gpt-5.1-chat-3"
|
|
subscription_key = "8zufUIPs0Dijh0M6NpifkkDvxJHZMFtott7u8V8ySTYNcpYVoRbsJQQJ99BBACfhMk5XJ3w3AAABACOGr6sq"
|
|
|
|
client = AzureOpenAI(
|
|
azure_endpoint=endpoint,
|
|
api_key=subscription_key,
|
|
api_version="2025-04-01-preview",
|
|
)
|
|
|
|
# ----- Step 1: caricare datasets -----
|
|
#df_labeled = pd.read_excel("main/datasets/annotated_dataset.xlsx").dropna(how="all")
|
|
df_labeled = pd.read_excel("main/datasets/annotated_dataset_updated.xlsx").dropna(how="all")
|
|
df_unlabeled = pd.read_excel("main/datasets/unlabeled_dataset.xlsx").dropna(how="all")
|
|
print("***STEP 1***\nDataset etichettato caricato. Numero righe:", len(df_labeled), "\nDataset non etichettato caricato. Numero righe:", len(df_unlabeled))
|
|
df_labeled = df_labeled.rename(columns={"automation_id": "id"})
|
|
df_unlabeled = df_unlabeled.rename(columns={"automation_id": "id"})
|
|
|
|
# Pulizia colonne
|
|
def clean_id(x):
|
|
if pd.isna(x):
|
|
return ""
|
|
s = str(x).strip() # rimuove spazi
|
|
s = s.strip('"').strip("'") # rimuove eventuali virgolette
|
|
return s.lower()
|
|
|
|
def clean_folder(x):
|
|
"""Pulizia folder: rimuove spazi multipli, normalizza unicode."""
|
|
if pd.isna(x):
|
|
return ""
|
|
s = str(x).strip().lower()
|
|
s = unicodedata.normalize("NFKC", s)
|
|
s = re.sub(r'\s+', ' ', s)
|
|
return s
|
|
|
|
for df in [df_labeled, df_unlabeled]:
|
|
df["id"] = df["id"].apply(clean_id)
|
|
df["folder"] = df["folder"].apply(clean_folder)
|
|
|
|
labeled_pairs = set(zip(df_labeled["id"], df_labeled["folder"]))
|
|
|
|
# crea maschera: True = la riga NON è presente in labeled
|
|
mask_unlabeled = ~df_unlabeled.apply(lambda r: (r["id"], r["folder"]) in labeled_pairs, axis=1)
|
|
# filtra
|
|
df_unlabeled_filtered = df_unlabeled[mask_unlabeled].copy()
|
|
|
|
print("Numero righe df_unlabeled dopo aver rimosso quelle già etichettate:", len(df_unlabeled_filtered))
|
|
|
|
unlabeled_pairs = set(zip(df_unlabeled["id"], df_unlabeled["folder"]))
|
|
missing_in_unlabeled = labeled_pairs - unlabeled_pairs
|
|
print("Numero coppie etichettate non presenti in unlabeled:", len(missing_in_unlabeled))
|
|
if missing_in_unlabeled:
|
|
print("Coppie mancanti:")
|
|
for p in list(missing_in_unlabeled)[:50]: # stampa solo le prime 50 per comodità
|
|
print(p)
|
|
|
|
# ----- Step 2: embeddings -----
|
|
# Silenzia warning generici
|
|
warnings.filterwarnings("ignore")
|
|
# Silenzia logging di transformers / sentence-transformers / HF hub
|
|
logging.getLogger("sentence_transformers").setLevel(logging.ERROR)
|
|
logging.getLogger("transformers").setLevel(logging.ERROR)
|
|
logging.getLogger("huggingface_hub").setLevel(logging.ERROR)
|
|
|
|
print("\n***Step 2***\nEmbeddings")
|
|
model = SentenceTransformer("all-MiniLM-L6-v2")
|
|
|
|
#with open("main/labeled_embeddings_71.pkl", "rb") as f:
|
|
with open("main/labeled_embeddings.pkl", "rb") as f:
|
|
data = pickle.load(f)
|
|
|
|
embeddings = data["embeddings"].astype("float32")
|
|
print("Shape embeddings:", embeddings.shape)
|
|
|
|
|
|
|
|
# ----- Step3: Creazione indice FAISS ---
|
|
faiss.normalize_L2(embeddings)
|
|
dimension = embeddings.shape[1]
|
|
index = faiss.IndexFlatIP(dimension)
|
|
index.add(embeddings)
|
|
print(f"\n***Step 3: Indice FAISS creato***.\nNumero di vettori nell'indice: {index.ntotal}")
|
|
|
|
|
|
# ----- Step 4: Retrieval (similarità cosine) -----
|
|
k = 5
|
|
output_rows = []
|
|
df_sample = df_unlabeled_filtered.head(10).reset_index(drop=True)
|
|
llm_rows = []
|
|
|
|
def sim_label(sim: float) -> str:
|
|
# più alto = più simile
|
|
if sim >= 0.80:
|
|
return "Match forte"
|
|
elif sim >= 0.60:
|
|
return "Match plausibile"
|
|
elif sim >= 0.50:
|
|
return "Similarità instabile"
|
|
else:
|
|
return "Debole"
|
|
|
|
for count, (_, row) in enumerate(df_sample.iterrows(), start=1):
|
|
query_text = str(row["human_like"])
|
|
print("automazione analizzata:", count)
|
|
|
|
# Calcolo embedding della nuova automazione
|
|
query_emb = model.encode([query_text], convert_to_numpy=True).astype("float32")
|
|
faiss.normalize_L2(query_emb)
|
|
|
|
# Recupera indici dei k vicini più prossimi
|
|
sims, indices = index.search(query_emb, k)
|
|
|
|
# Metriche globali sui top-k (una volta per automazione)
|
|
topk_cats = []
|
|
top1_sim = float(sims[0][0])
|
|
top1_similarity_label = sim_label(top1_sim)
|
|
|
|
for rank in range(k):
|
|
idx = int(indices[0][rank])
|
|
sim = float(sims[0][rank])
|
|
|
|
retrieved_row = df_labeled.iloc[idx]
|
|
topk_cats.append(str(retrieved_row.get("category", "")))
|
|
|
|
rank1_category = topk_cats[0] if topk_cats else ""
|
|
majority_category = Counter(topk_cats).most_common(1)[0][0] if topk_cats else ""
|
|
consistency = (sum(c == majority_category for c in topk_cats) / len(topk_cats)) if topk_cats else 0.0
|
|
|
|
for rank in range(k):
|
|
idx = int(indices[0][rank])
|
|
sim = float(sims[0][rank])
|
|
label = sim_label(sim)
|
|
|
|
retrieved_row = df_labeled.iloc[idx]
|
|
|
|
output_rows.append({
|
|
"automazione da etichettare": query_text,
|
|
# info retrieval per questa riga
|
|
"rank": rank + 1,
|
|
"retrieved_idx": idx,
|
|
"automazione simile": retrieved_row.get("automation", ""),
|
|
"categoria automazione simile": retrieved_row.get("category", ""),
|
|
"similarita_cosine": sim,
|
|
"similarity_label": label,
|
|
# metriche aggregate top-k (ripetute su ogni riga)
|
|
"rank1_similarity": top1_sim,
|
|
"rank1_similarity_label": top1_similarity_label,
|
|
"rank1_category": rank1_category,
|
|
"majority_category": majority_category,
|
|
"consistency": round(consistency, 3),
|
|
"top5_categories": " | ".join(topk_cats),
|
|
})
|
|
|
|
|
|
# ----- Step 5: invio dati al LLM -----
|
|
# (1) Costruzione prompt
|
|
retrieved = df_labeled.iloc[indices[0]].copy()
|
|
retrieved["similarity"] = sims[0].astype(float)
|
|
retrieved["similarity_label"] = retrieved["similarity"].apply(sim_label)
|
|
prompt = build_prompt_local(query_text, retrieved, sim_label)
|
|
|
|
# (2) Chiamata al modello: restituisce JSON
|
|
resp = client.chat.completions.create(
|
|
model=deployment,
|
|
messages=[
|
|
{"role": "system", "content": prompt},
|
|
{"role": "user", "content": f'automation to evaluate: {query_text}'}
|
|
],
|
|
reasoning_effort= "low"
|
|
)
|
|
content = resp.choices[0].message.content.strip()
|
|
|
|
# (3) Parsing della risposta
|
|
try:
|
|
parsed = json.loads(content)
|
|
except Exception:
|
|
parsed = {
|
|
"automation": query_text,
|
|
"category": "",
|
|
"subcategory": "",
|
|
"problem_type": "",
|
|
"gravity": "",
|
|
"scores": {},
|
|
"needs_human_review": True,
|
|
"short_rationale": f"JSON_PARSE_ERROR: {content[:200]}",
|
|
}
|
|
|
|
# (4) Salvataggio di 1 riga per automazione con:
|
|
# - metriche retrieval (rank1/majority/consistency)
|
|
# - output dell'LLM (scores + label finale + review flag)
|
|
llm_category = str(parsed.get("category", "")).strip()
|
|
llm_subcategory = str(parsed.get("subcategory", "")).strip()
|
|
llm_problem_type = str(parsed.get("problem_type", "")).strip()
|
|
llm_gravity = str(parsed.get("gravity", "")).strip()
|
|
if llm_category.upper() == "HARMLESS":
|
|
llm_subcategory = ""
|
|
llm_problem_type = "none"
|
|
llm_gravity = "NONE"
|
|
# di default l'etichetta assegnata è quella del LLM - rivista se review=true
|
|
final_category = llm_category
|
|
final_subcategory = llm_subcategory
|
|
final_problem_type = llm_problem_type
|
|
final_gravity = llm_gravity
|
|
|
|
|
|
if top1_similarity_label == "Debole" or top1_similarity_label == "Similarità instabile":
|
|
needs_review = True
|
|
else:
|
|
needs_review = False
|
|
|
|
|
|
final_needs_review = needs_review
|
|
# ================= HUMAN REVIEW LOGIC =================
|
|
aligned_strong = (
|
|
llm_category == majority_category
|
|
and llm_category == rank1_category
|
|
and llm_category != ""
|
|
)
|
|
|
|
OVERRIDE_MIN_SIMILARITY = 0.39
|
|
OVERRIDE_MIN_CONSISTENCY = 0.60
|
|
|
|
good_retrieval = (
|
|
top1_sim >= OVERRIDE_MIN_SIMILARITY
|
|
and consistency >= OVERRIDE_MIN_CONSISTENCY
|
|
)
|
|
|
|
if aligned_strong and good_retrieval:
|
|
final_needs_review = False
|
|
# =====================================================
|
|
|
|
|
|
llm_rows.append({
|
|
"id": row.get("id", ""),
|
|
"folder": row.get("folder", ""),
|
|
"automation_text": query_text,
|
|
|
|
# Retrieval metrics
|
|
"rank1_similarity": top1_sim,
|
|
"rank1_similarity_label": top1_similarity_label,
|
|
"rank1_category": rank1_category,
|
|
"majority_category": majority_category,
|
|
"consistency": round(consistency, 3),
|
|
"top5_categories": " | ".join(topk_cats),
|
|
|
|
# LLM
|
|
"llm_category": llm_category,
|
|
"llm_subcategory": llm_subcategory,
|
|
"llm_problem_type": llm_problem_type,
|
|
"llm_gravity": llm_gravity,
|
|
|
|
"needs_review": needs_review,
|
|
"final_needs_review": final_needs_review,
|
|
|
|
# FINAL
|
|
"final_category": final_category,
|
|
"final_subcategory": final_subcategory,
|
|
"final_problem_type": final_problem_type,
|
|
"final_gravity": final_gravity,
|
|
|
|
"llm_rationale": parsed.get("short_rationale", ""),
|
|
})
|
|
|
|
|
|
# ----- Step 6: output Excel -----
|
|
df_out = pd.DataFrame(llm_rows)
|
|
out_path = "main/datasets/labeling_2_500.xlsx"
|
|
df_out.to_excel(out_path, index=False)
|
|
|
|
wb = load_workbook(out_path)
|
|
ws = wb.active
|
|
|
|
true_fill = PatternFill(start_color="FF6347", end_color="FF6347", fill_type="solid") # rosso
|
|
false_fill = PatternFill(start_color="90EE90", end_color="90EE90", fill_type="solid") # verde
|
|
|
|
col_index = {cell.value: idx for idx, cell in enumerate(ws[1], start=1)}
|
|
|
|
for col_name in ["needs_review", "final_needs_review"]:
|
|
if col_name in col_index:
|
|
c = col_index[col_name]
|
|
for r in range(2, ws.max_row + 1):
|
|
val = ws.cell(row=r, column=c).value
|
|
if val is True:
|
|
ws.cell(row=r, column=c).fill = true_fill
|
|
elif val is False:
|
|
ws.cell(row=r, column=c).fill = false_fill
|
|
|
|
wb.save(out_path)
|
|
print(f"\n***Step 6: Excel salvato in {out_path}")
|
|
|
|
# --- Conteggio needs_human_review ---
|
|
review_counts = df_out["final_needs_review"].value_counts(dropna=False)
|
|
true_count = review_counts.get(True, 0)
|
|
false_count = review_counts.get(False, 0)
|
|
print("\n--- Needs human review summary ---")
|
|
print(f"needs_human_review = True : {true_count}")
|
|
print(f"needs_human_review = False: {false_count}") |