Upload files to "/"

This commit is contained in:
Arianna Di Serio 2026-03-03 16:06:01 +01:00
parent 384672c974
commit 04055a36c7
1 changed files with 283 additions and 0 deletions

283
cosine.py Normal file
View File

@ -0,0 +1,283 @@
# --- Import librerie ---
import pandas as pd
from openai import AzureOpenAI
import pickle
from sentence_transformers import SentenceTransformer
import numpy as np
import faiss
import openpyxl
import re
import json
from openpyxl.styles import PatternFill
from openpyxl import load_workbook
from collections import Counter
from prompts.prompt import build_prompt_local
import warnings
import logging
# --- Configurazione ---
endpoint = "https://gpt-sw-central-tap-security.openai.azure.com/"
deployment = "gpt-4o"
subscription_key = "8zufUIPs0Dijh0M6NpifkkDvxJHZMFtott7u8V8ySTYNcpYVoRbsJQQJ99BBACfhMk5XJ3w3AAABACOGr6sq"
client = AzureOpenAI(
azure_endpoint=endpoint,
api_key=subscription_key,
api_version="2024-05-01-preview",
)
# ----- Step 1: caricare datasets -----
df_labeled = pd.read_csv("main/datasets/annotated_dataset.csv", encoding="cp1252", sep=";")
df_unlabeled = pd.read_csv("main/datasets/unlabeled_dataset.csv", sep="\t", encoding="utf-8")
print(
"***STEP 1***\nDataset etichettato caricato. Numero righe:",
len(df_labeled),
"\nDataset non etichettato caricato. Numero righe:",
len(df_unlabeled),
)
def clean_id(x):
if pd.isna(x):
return ""
s = str(x)
m = re.search(r"\d+", s)
return m.group(0) if m else s.strip()
df_labeled["automation_id"] = df_labeled["automation_id"].apply(clean_id)
df_unlabeled["automation_id"] = df_unlabeled["automation_id"].apply(clean_id)
df_labeled["folder"] = df_labeled["folder"].astype(str).str.strip()
df_unlabeled["folder"] = df_unlabeled["folder"].astype(str).str.strip()
labeled_pairs = set(zip(df_labeled["automation_id"], df_labeled["folder"]))
df_unlabeled_filtered = df_unlabeled[
~df_unlabeled.apply(lambda row: (row["automation_id"], row["folder"]) in labeled_pairs, axis=1)
]
print("Automazioni non etichettate rimanenti dopo la pulizia:", len(df_unlabeled_filtered))
# ----- Step 2: embeddings -----
warnings.filterwarnings("ignore")
logging.getLogger("sentence_transformers").setLevel(logging.ERROR)
logging.getLogger("transformers").setLevel(logging.ERROR)
logging.getLogger("huggingface_hub").setLevel(logging.ERROR)
print("\n***Step 2***\nEmbeddings")
model = SentenceTransformer("all-MiniLM-L6-v2")
with open("main/labeled_embeddings.pkl", "rb") as f:
data = pickle.load(f)
embeddings = data["embeddings"].astype("float32")
print("Shape embeddings:", embeddings.shape)
# ⚠️ Cosine: normalizza i vettori
faiss.normalize_L2(embeddings)
# ----- Step 3: indice FAISS (Cosine via Inner Product) -----
dimension = embeddings.shape[1]
index = faiss.IndexFlatIP(dimension) # inner product su vettori normalizzati = cosine similarity
index.add(embeddings)
print(f"\n***Step 3: Indice FAISS creato***.\nNumero di vettori nell'indice: {index.ntotal}")
# ----- Step 4: Retrieval (similarità cosine) -----
k = 5
output_rows = []
df_sample = df_unlabeled_filtered.head(20).reset_index(drop=True) # SOLO prime 20
llm_rows = []
def sim_label(sim: float) -> str:
# sim è cosine similarity (più alto = più simile)
if sim >= 0.85:
return "Match forte"
elif sim >= 0.70:
return "Match plausibile"
elif sim >= 0.55:
return "Similarità instabile"
else:
return "Debole"
for count, (_, row) in enumerate(df_sample.iterrows(), start=1):
query_text = str(row["human_like"])
print("numero corrente:", count)
# embedding query + normalizzazione (cosine)
query_emb = model.encode([query_text], convert_to_numpy=True).astype("float32")
faiss.normalize_L2(query_emb)
# search: ritorna cosine similarity (inner product)
sims, indices = index.search(query_emb, k)
topk_cats = []
top1_sim = float(sims[0][0])
top1_similarity_label = sim_label(top1_sim)
for rank in range(k):
idx = int(indices[0][rank])
sim = float(sims[0][rank])
retrieved_row = df_labeled.iloc[idx]
topk_cats.append(str(retrieved_row.get("category", "")))
rank1_category = topk_cats[0] if topk_cats else ""
majority_category = Counter(topk_cats).most_common(1)[0][0] if topk_cats else ""
consistency = (sum(c == majority_category for c in topk_cats) / len(topk_cats)) if topk_cats else 0.0
# Salva analisi retrieval (opzionale)
for rank in range(k):
idx = int(indices[0][rank])
sim = float(sims[0][rank])
label = sim_label(sim)
retrieved_row = df_labeled.iloc[idx]
output_rows.append({
"automazione da etichettare": query_text,
"rank": rank + 1,
"retrieved_idx": idx,
"automazione simile": retrieved_row.get("automation", ""),
"categoria automazione simile": retrieved_row.get("category", ""),
"similarita_cosine": sim,
"similarity_label": label,
"rank1_similarity": top1_sim,
"rank1_similarity_label": top1_similarity_label,
"rank1_category": rank1_category,
"majority_category": majority_category,
"consistency": round(consistency, 3),
"top5_categories": " | ".join(topk_cats),
})
# ----- Step 5: invio dati al LLM -----
# NB: build_prompt_local deve usare la colonna "similarity" (non "distance").
retrieved = df_labeled.iloc[indices[0]].copy()
retrieved["similarity"] = sims[0].astype(float)
retrieved["similarity_label"] = retrieved["similarity"].apply(sim_label)
# Se nel prompt vuoi anche un numero "confidence", puoi usare direttamente similarity
retrieved["confidence"] = retrieved["similarity"]
prompt = build_prompt_local(query_text, retrieved, sim_label)
resp = client.chat.completions.create(
model=deployment,
messages=[
{"role": "system", "content": "Return ONLY valid JSON. No extra text."},
{"role": "user", "content": prompt},
],
temperature=0,
)
content = resp.choices[0].message.content.strip()
try:
parsed = json.loads(content)
except Exception:
parsed = {
"automation": query_text,
"category": "",
"subcategory": "",
"problem_type": "",
"gravity": "",
"scores": {},
"needs_human_review": True,
"short_rationale": f"JSON_PARSE_ERROR: {content[:200]}",
}
# ----- Normalizzazione output LLM + final labels -----
llm_category = str(parsed.get("category", "")).strip()
llm_subcategory = str(parsed.get("subcategory", "")).strip()
llm_problem_type = str(parsed.get("problem_type", "")).strip()
llm_gravity = str(parsed.get("gravity", "")).strip()
# Regola deterministica HARMLESS
if llm_category.upper() == "HARMLESS":
llm_subcategory = ""
llm_problem_type = "none"
llm_gravity = "NONE"
final_category = llm_category
final_subcategory = llm_subcategory
final_problem_type = llm_problem_type
final_gravity = llm_gravity
# ----- HUMAN REVIEW LOGIC (su SIMILARITÀ, non distanza) -----
needs_human_review = bool(parsed.get("needs_human_review", True))
# soglie cosine (da tarare)
OVERRIDE_MIN_SIMILARITY = 0.70
OVERRIDE_MIN_CONSISTENCY = 0.60
aligned_strong = (
final_category == majority_category
and final_category == rank1_category
and final_category != ""
)
good_retrieval = (top1_sim >= OVERRIDE_MIN_SIMILARITY) and (consistency >= OVERRIDE_MIN_CONSISTENCY)
if aligned_strong and good_retrieval:
needs_human_review = False
llm_rows.append({
"automation_id": row.get("automation_id", ""),
"folder": row.get("folder", ""),
"automation_text": query_text,
# Retrieval metrics (cosine)
"rank1_similarity": top1_sim,
"rank1_similarity_label": top1_similarity_label,
"rank1_category": rank1_category,
"majority_category": majority_category,
"consistency": round(consistency, 3),
"top5_categories": " | ".join(topk_cats),
# LLM raw
"llm_category": llm_category,
"llm_subcategory": llm_subcategory,
"llm_problem_type": llm_problem_type,
"llm_gravity": llm_gravity,
"llm_needs_human_review": bool(parsed.get("needs_human_review", True)),
# FINAL
"final_category": final_category,
"final_subcategory": final_subcategory,
"final_problem_type": final_problem_type,
"final_gravity": final_gravity,
"final_needs_human_review": needs_human_review,
"llm_rationale": parsed.get("short_rationale", ""),
})
# ----- Step 6: output Excel -----
df_llm = pd.DataFrame(llm_rows)
out_path = "main/datasets/labeling_first20_cosine.xlsx"
df_llm.to_excel(out_path, index=False)
wb = load_workbook(out_path)
ws = wb.active
true_fill = PatternFill(start_color="FF6347", end_color="FF6347", fill_type="solid") # rosso
false_fill = PatternFill(start_color="90EE90", end_color="90EE90", fill_type="solid") # verde
col_index = {cell.value: idx for idx, cell in enumerate(ws[1], start=1)}
for col_name in ["llm_needs_human_review", "final_needs_human_review"]:
if col_name in col_index:
c = col_index[col_name]
for r in range(2, ws.max_row + 1):
val = ws.cell(row=r, column=c).value
if val is True:
ws.cell(row=r, column=c).fill = true_fill
elif val is False:
ws.cell(row=r, column=c).fill = false_fill
wb.save(out_path)
print(f"\n***Step 6: Retrieval (cosine) + LLM ***\nExcel salvato in {out_path}")
review_counts = df_llm["final_needs_human_review"].value_counts(dropna=False)
print("\n--- Needs human review summary (final) ---")
print(f"True : {review_counts.get(True, 0)}")
print(f"False: {review_counts.get(False, 0)}")