diff --git a/cosine.py b/cosine.py new file mode 100644 index 0000000..97fc5d7 --- /dev/null +++ b/cosine.py @@ -0,0 +1,283 @@ +# --- Import librerie --- +import pandas as pd +from openai import AzureOpenAI +import pickle +from sentence_transformers import SentenceTransformer +import numpy as np +import faiss +import openpyxl +import re +import json +from openpyxl.styles import PatternFill +from openpyxl import load_workbook +from collections import Counter +from prompts.prompt import build_prompt_local +import warnings +import logging + + +# --- Configurazione --- +endpoint = "https://gpt-sw-central-tap-security.openai.azure.com/" +deployment = "gpt-4o" +subscription_key = "8zufUIPs0Dijh0M6NpifkkDvxJHZMFtott7u8V8ySTYNcpYVoRbsJQQJ99BBACfhMk5XJ3w3AAABACOGr6sq" + +client = AzureOpenAI( + azure_endpoint=endpoint, + api_key=subscription_key, + api_version="2024-05-01-preview", +) + +# ----- Step 1: caricare datasets ----- +df_labeled = pd.read_csv("main/datasets/annotated_dataset.csv", encoding="cp1252", sep=";") +df_unlabeled = pd.read_csv("main/datasets/unlabeled_dataset.csv", sep="\t", encoding="utf-8") +print( + "***STEP 1***\nDataset etichettato caricato. Numero righe:", + len(df_labeled), + "\nDataset non etichettato caricato. Numero righe:", + len(df_unlabeled), +) + +def clean_id(x): + if pd.isna(x): + return "" + s = str(x) + m = re.search(r"\d+", s) + return m.group(0) if m else s.strip() + +df_labeled["automation_id"] = df_labeled["automation_id"].apply(clean_id) +df_unlabeled["automation_id"] = df_unlabeled["automation_id"].apply(clean_id) +df_labeled["folder"] = df_labeled["folder"].astype(str).str.strip() +df_unlabeled["folder"] = df_unlabeled["folder"].astype(str).str.strip() + +labeled_pairs = set(zip(df_labeled["automation_id"], df_labeled["folder"])) +df_unlabeled_filtered = df_unlabeled[ + ~df_unlabeled.apply(lambda row: (row["automation_id"], row["folder"]) in labeled_pairs, axis=1) +] +print("Automazioni non etichettate rimanenti dopo la pulizia:", len(df_unlabeled_filtered)) + + +# ----- Step 2: embeddings ----- +warnings.filterwarnings("ignore") +logging.getLogger("sentence_transformers").setLevel(logging.ERROR) +logging.getLogger("transformers").setLevel(logging.ERROR) +logging.getLogger("huggingface_hub").setLevel(logging.ERROR) + +print("\n***Step 2***\nEmbeddings") +model = SentenceTransformer("all-MiniLM-L6-v2") + +with open("main/labeled_embeddings.pkl", "rb") as f: + data = pickle.load(f) + +embeddings = data["embeddings"].astype("float32") +print("Shape embeddings:", embeddings.shape) + +# ⚠️ Cosine: normalizza i vettori +faiss.normalize_L2(embeddings) + + +# ----- Step 3: indice FAISS (Cosine via Inner Product) ----- +dimension = embeddings.shape[1] +index = faiss.IndexFlatIP(dimension) # inner product su vettori normalizzati = cosine similarity +index.add(embeddings) +print(f"\n***Step 3: Indice FAISS creato***.\nNumero di vettori nell'indice: {index.ntotal}") + + +# ----- Step 4: Retrieval (similarità cosine) ----- +k = 5 +output_rows = [] +df_sample = df_unlabeled_filtered.head(20).reset_index(drop=True) # SOLO prime 20 +llm_rows = [] + +def sim_label(sim: float) -> str: + # sim è cosine similarity (più alto = più simile) + if sim >= 0.85: + return "Match forte" + elif sim >= 0.70: + return "Match plausibile" + elif sim >= 0.55: + return "Similarità instabile" + else: + return "Debole" + +for count, (_, row) in enumerate(df_sample.iterrows(), start=1): + query_text = str(row["human_like"]) + print("numero corrente:", count) + + # embedding query + normalizzazione (cosine) + query_emb = model.encode([query_text], convert_to_numpy=True).astype("float32") + faiss.normalize_L2(query_emb) + + # search: ritorna cosine similarity (inner product) + sims, indices = index.search(query_emb, k) + + topk_cats = [] + top1_sim = float(sims[0][0]) + top1_similarity_label = sim_label(top1_sim) + + for rank in range(k): + idx = int(indices[0][rank]) + sim = float(sims[0][rank]) + + retrieved_row = df_labeled.iloc[idx] + topk_cats.append(str(retrieved_row.get("category", ""))) + + rank1_category = topk_cats[0] if topk_cats else "" + majority_category = Counter(topk_cats).most_common(1)[0][0] if topk_cats else "" + consistency = (sum(c == majority_category for c in topk_cats) / len(topk_cats)) if topk_cats else 0.0 + + # Salva analisi retrieval (opzionale) + for rank in range(k): + idx = int(indices[0][rank]) + sim = float(sims[0][rank]) + label = sim_label(sim) + + retrieved_row = df_labeled.iloc[idx] + + output_rows.append({ + "automazione da etichettare": query_text, + "rank": rank + 1, + "retrieved_idx": idx, + "automazione simile": retrieved_row.get("automation", ""), + "categoria automazione simile": retrieved_row.get("category", ""), + "similarita_cosine": sim, + "similarity_label": label, + + "rank1_similarity": top1_sim, + "rank1_similarity_label": top1_similarity_label, + "rank1_category": rank1_category, + "majority_category": majority_category, + "consistency": round(consistency, 3), + "top5_categories": " | ".join(topk_cats), + }) + + # ----- Step 5: invio dati al LLM ----- + # NB: build_prompt_local deve usare la colonna "similarity" (non "distance"). + retrieved = df_labeled.iloc[indices[0]].copy() + retrieved["similarity"] = sims[0].astype(float) + retrieved["similarity_label"] = retrieved["similarity"].apply(sim_label) + + # Se nel prompt vuoi anche un numero "confidence", puoi usare direttamente similarity + retrieved["confidence"] = retrieved["similarity"] + + prompt = build_prompt_local(query_text, retrieved, sim_label) + + resp = client.chat.completions.create( + model=deployment, + messages=[ + {"role": "system", "content": "Return ONLY valid JSON. No extra text."}, + {"role": "user", "content": prompt}, + ], + temperature=0, + ) + content = resp.choices[0].message.content.strip() + + try: + parsed = json.loads(content) + except Exception: + parsed = { + "automation": query_text, + "category": "", + "subcategory": "", + "problem_type": "", + "gravity": "", + "scores": {}, + "needs_human_review": True, + "short_rationale": f"JSON_PARSE_ERROR: {content[:200]}", + } + + # ----- Normalizzazione output LLM + final labels ----- + llm_category = str(parsed.get("category", "")).strip() + llm_subcategory = str(parsed.get("subcategory", "")).strip() + llm_problem_type = str(parsed.get("problem_type", "")).strip() + llm_gravity = str(parsed.get("gravity", "")).strip() + + # Regola deterministica HARMLESS + if llm_category.upper() == "HARMLESS": + llm_subcategory = "" + llm_problem_type = "none" + llm_gravity = "NONE" + + final_category = llm_category + final_subcategory = llm_subcategory + final_problem_type = llm_problem_type + final_gravity = llm_gravity + + # ----- HUMAN REVIEW LOGIC (su SIMILARITÀ, non distanza) ----- + needs_human_review = bool(parsed.get("needs_human_review", True)) + + # soglie cosine (da tarare) + OVERRIDE_MIN_SIMILARITY = 0.70 + OVERRIDE_MIN_CONSISTENCY = 0.60 + + aligned_strong = ( + final_category == majority_category + and final_category == rank1_category + and final_category != "" + ) + + good_retrieval = (top1_sim >= OVERRIDE_MIN_SIMILARITY) and (consistency >= OVERRIDE_MIN_CONSISTENCY) + + if aligned_strong and good_retrieval: + needs_human_review = False + + llm_rows.append({ + "automation_id": row.get("automation_id", ""), + "folder": row.get("folder", ""), + "automation_text": query_text, + + # Retrieval metrics (cosine) + "rank1_similarity": top1_sim, + "rank1_similarity_label": top1_similarity_label, + "rank1_category": rank1_category, + "majority_category": majority_category, + "consistency": round(consistency, 3), + "top5_categories": " | ".join(topk_cats), + + # LLM raw + "llm_category": llm_category, + "llm_subcategory": llm_subcategory, + "llm_problem_type": llm_problem_type, + "llm_gravity": llm_gravity, + "llm_needs_human_review": bool(parsed.get("needs_human_review", True)), + + # FINAL + "final_category": final_category, + "final_subcategory": final_subcategory, + "final_problem_type": final_problem_type, + "final_gravity": final_gravity, + "final_needs_human_review": needs_human_review, + + "llm_rationale": parsed.get("short_rationale", ""), + }) + + +# ----- Step 6: output Excel ----- +df_llm = pd.DataFrame(llm_rows) +out_path = "main/datasets/labeling_first20_cosine.xlsx" +df_llm.to_excel(out_path, index=False) + +wb = load_workbook(out_path) +ws = wb.active + +true_fill = PatternFill(start_color="FF6347", end_color="FF6347", fill_type="solid") # rosso +false_fill = PatternFill(start_color="90EE90", end_color="90EE90", fill_type="solid") # verde + +col_index = {cell.value: idx for idx, cell in enumerate(ws[1], start=1)} + +for col_name in ["llm_needs_human_review", "final_needs_human_review"]: + if col_name in col_index: + c = col_index[col_name] + for r in range(2, ws.max_row + 1): + val = ws.cell(row=r, column=c).value + if val is True: + ws.cell(row=r, column=c).fill = true_fill + elif val is False: + ws.cell(row=r, column=c).fill = false_fill + +wb.save(out_path) +print(f"\n***Step 6: Retrieval (cosine) + LLM ***\nExcel salvato in {out_path}") + +review_counts = df_llm["final_needs_human_review"].value_counts(dropna=False) +print("\n--- Needs human review summary (final) ---") +print(f"True : {review_counts.get(True, 0)}") +print(f"False: {review_counts.get(False, 0)}") \ No newline at end of file