#!/usr/bin/env python3 """ Aggressive entity deduplication — camada 1 (determinístico). Hoje há ~34.355 entidades; muitas são variações tipográficas, prefixos de papel (Mr./Dr./Major), ou OBJ-* gerados por chunk em vez de por evento. Este script faz merge em três camadas, todas alta-confiança: A. PROPER_NAME — pessoa com ≥2 tokens onde o nome próprio principal (último token significativo + primeiro nome) é único após strip de role prefixes. Ex: "Frank M. Brown", "Lt. Frank M. Brown", "Special Agent Frank M. Brown" → 1 entidade canônica. B. UAP_OBJECT_BY_EVENT — todos os OBJ-EV--NN do mesmo evento são colapsados em 1 OBJ-EV--00 (NN=00 = canonical). C. EXACT_NORMALIZED — após lowercase + strip de pontuação + strip de stopwords + strip de sufixos tipo " UAP" / " incident", strings idênticas viram 1 entidade. Para cada cluster: - Escolhe canonical: o mais longo OU o que tem narrative_summary curado, com fallback no primeiro alfabético. - Une aliases[], mentioned_in[], text_mentioned_in[], referenced_by[]. - Recalcula signal_sources somando page_refs/text_refs (db_chunks fica com o do canonical pq depende do entity_pk). - Move duplicatas para wiki/entities/_archived/. Output: lista de merges (cluster → canonical), pra revisar antes de aplicar. Run: python3 scripts/maintain/49_dedup_aggressive.py --dry-run python3 scripts/maintain/49_dedup_aggressive.py # apply """ from __future__ import annotations import argparse import re import shutil import sys import unicodedata from pathlib import Path from collections import defaultdict from typing import Iterable import yaml WIKI_ENT = Path("/Users/guto/ufo/wiki/entities") ARCHIVED = WIKI_ENT / "_archived" ROLE_PREFIX_RE = re.compile( r"^(" r"mr|mrs|ms|dr|prof|sr|sra|sir|dame|lord|lady|" r"major|maj|colonel|col|lt|lieutenant|captain|capt|" r"general|gen|sergeant|sgt|corporal|cpl|private|pvt|admiral|adm|commander|cmdr|" r"agent|special agent|sa|director|deputy director|deputy|" r"reverend|rev|professor|" r"president|vice president|vp|chairman|secretary|" r"detective|det|inspector" r")\.?\s+", re.IGNORECASE, ) STOPWORD_PREFIX_RE = re.compile(r"^(the|a|an|o|os|a|as|de|do|da|dos|das|of|los|las|el|la|le|les)\s+", re.IGNORECASE) PUNCT_RE = re.compile(r"[.,;:!?\"'\(\)\[\]_\-]") WS_RE = re.compile(r"\s+") NOISE_SUFFIX_RE = re.compile(r"\s+(uap|incident|case|sighting|event|observation)$", re.IGNORECASE) OBJ_ID_RE = re.compile(r"^OBJ-([A-Z0-9]+)-(.+?)-(\d{2})$") def ascii_fold(s: str) -> str: return "".join( c for c in unicodedata.normalize("NFD", s) if not unicodedata.combining(c) ) def aggressive_normalize(name: str) -> str: s = ascii_fold(name).strip().lower() # strip role prefixes (repeat: "Special Agent Major Brown") for _ in range(3): new = ROLE_PREFIX_RE.sub("", s) if new == s: break s = new s = STOPWORD_PREFIX_RE.sub("", s) s = PUNCT_RE.sub(" ", s) s = WS_RE.sub(" ", s).strip() s = NOISE_SUFFIX_RE.sub("", s).strip() return s FOLDER_TO_CLASS = { "people": "person", "organizations": "organization", "locations": "location", "events": "event", "uap-objects": "uap_object", "vehicles": "vehicle", "operations": "operation", "concepts": "concept", } def load_entity(path: Path) -> dict | None: try: text = path.read_text(encoding="utf-8") if not text.startswith("---"): return None parts = text.split("---", 2) if len(parts) < 3: return None fm = yaml.safe_load(parts[1]) or {} body = parts[2] return {"path": path, "fm": fm, "body": body, "raw": text} except Exception as e: return None def dump_entity(entity: dict) -> str: return "---\n" + yaml.safe_dump( entity["fm"], sort_keys=False, allow_unicode=True, width=1000 ) + "---" + entity["body"] def dedup_pass_obj_by_event(entities: list[dict]) -> dict[str, list[dict]]: """OBJ-EVYYYY-EVENT-NN → group by EVENT base (drop NN).""" clusters: dict[str, list[dict]] = defaultdict(list) for e in entities: if e["fm"].get("entity_class") != "uap_object": continue eid = e["fm"].get("uap_object_id") or e["fm"].get("entity_id") or "" m = OBJ_ID_RE.match(eid) if not m: continue # Group by EV- key = f"OBJ-{m.group(1)}-{m.group(2)}" clusters[key].append(e) return {k: v for k, v in clusters.items() if len(v) > 1} def dedup_pass_proper_name(entities: list[dict]) -> dict[str, list[dict]]: """Person/organization/event/location: cluster by aggressive_normalize. Only auto-merge if the normalized form has ≥2 tokens (avoids "smith" only). """ clusters: dict[str, list[dict]] = defaultdict(list) for e in entities: cls = e["fm"].get("entity_class") if cls not in ("person", "organization", "event", "location", "operation", "concept", "vehicle"): continue name = e["fm"].get("canonical_name") or "" if not name: continue norm = aggressive_normalize(name) if not norm: continue # Require ≥2 tokens OR ≥6 chars to avoid "smith" / "brown" collisions n_tokens = len(norm.split()) if n_tokens < 2 and len(norm) < 8: continue key = f"{cls}::{norm}" clusters[key].append(e) return {k: v for k, v in clusters.items() if len(v) > 1} def choose_canonical(cluster: list[dict]) -> dict: """Pick canonical: prefer one with curated narrative, then longest aliases, then most mentions, then first alphabetical.""" def score(e: dict) -> tuple: fm = e["fm"] curated = 1 if fm.get("summary_status") == "curated" else 0 n_aliases = len(fm.get("aliases") or []) mentions = fm.get("total_mentions") or 0 # Negative path to make alphabetical ascending name_for_sort = str(fm.get("canonical_name") or "") return (curated, n_aliases, mentions, -ord(name_for_sort[0]) if name_for_sort else 0) return max(cluster, key=score) def merge_into(canonical: dict, duplicates: list[dict]) -> None: """Merge fields from duplicates into canonical (in place).""" cfm = canonical["fm"] cfm.setdefault("aliases", []) cfm.setdefault("mentioned_in", []) cfm.setdefault("text_mentioned_in", []) cfm.setdefault("referenced_by", []) cfm.setdefault("related", []) # Collect aliases (include the duplicates' canonical_name as alias) all_aliases = set(cfm["aliases"] or []) all_aliases.add(cfm.get("canonical_name", "")) all_mentions = set(cfm["mentioned_in"] or []) all_text_mentions = set(cfm["text_mentioned_in"] or []) all_referenced = set(cfm["referenced_by"] or []) all_related = set(cfm["related"] or []) page_refs_sum = int((cfm.get("signal_sources") or {}).get("page_refs") or 0) text_refs_sum = int((cfm.get("signal_sources") or {}).get("text_refs") or 0) for d in duplicates: dfm = d["fm"] dcanonical = dfm.get("canonical_name") if dcanonical: all_aliases.add(dcanonical) for a in (dfm.get("aliases") or []): all_aliases.add(a) for m in (dfm.get("mentioned_in") or []): all_mentions.add(m) for m in (dfm.get("text_mentioned_in") or []): all_text_mentions.add(m) for r in (dfm.get("referenced_by") or []): all_referenced.add(r) for r in (dfm.get("related") or []): all_related.add(r) all_aliases.discard("") all_aliases.discard(None) cfm["aliases"] = sorted(all_aliases) cfm["mentioned_in"] = sorted(all_mentions) cfm["text_mentioned_in"] = sorted(all_text_mentions) cfm["referenced_by"] = sorted(all_referenced) cfm["related"] = sorted(all_related) cfm["documents_count"] = len({m.split("/")[0].lstrip("[") for m in all_mentions}) # Recompute signal_sources (page_refs/text_refs are sums; db_chunks stays as canonical's) sigs = cfm.get("signal_sources") or {"db_chunks": 0, "cross_refs": 0} sigs["page_refs"] = len(all_mentions) sigs["text_refs"] = len(all_text_mentions) sigs["cross_refs"] = len(all_referenced) sigs["db_chunks"] = int(sigs.get("db_chunks", 0)) cfm["signal_sources"] = sigs cfm["total_mentions"] = sigs["db_chunks"] + sigs["page_refs"] + sigs["cross_refs"] + sigs["text_refs"] # Recompute signal_strength total = cfm["total_mentions"] if total == 0: cfm["signal_strength"] = "orphan" elif (sigs["db_chunks"] >= 3 or sigs["page_refs"] >= 3 or (sigs["db_chunks"] >= 1 and sigs["page_refs"] >= 1) or sigs["text_refs"] >= 5): cfm["signal_strength"] = "strong" else: cfm["signal_strength"] = "weak" def archive_path(p: Path) -> Path: rel = p.relative_to(WIKI_ENT) return ARCHIVED / rel def main() -> int: ap = argparse.ArgumentParser() ap.add_argument("--dry-run", action="store_true") ap.add_argument("--limit-pass", choices=["all", "obj", "name"], default="all") args = ap.parse_args() print(f"Loading entities from {WIKI_ENT} ...") all_entities: list[dict] = [] for f in WIKI_ENT.rglob("*.md"): if "_archived" in f.parts: continue ent = load_entity(f) if ent and ent["fm"].get("type") == "entity": all_entities.append(ent) print(f" loaded {len(all_entities)} entities") # Run dedup passes clusters: dict[str, list[dict]] = {} if args.limit_pass in ("all", "obj"): obj_clusters = dedup_pass_obj_by_event(all_entities) print(f"\nPass A — OBJ by event: {len(obj_clusters)} clusters ({sum(len(v) for v in obj_clusters.values())} entities → {len(obj_clusters)} canonicals)") clusters.update({f"OBJ::{k}": v for k, v in obj_clusters.items()}) if args.limit_pass in ("all", "name"): name_clusters = dedup_pass_proper_name(all_entities) print(f"Pass B/C — proper-name normalize: {len(name_clusters)} clusters ({sum(len(v) for v in name_clusters.values())} entities → {len(name_clusters)} canonicals)") clusters.update({f"NAME::{k}": v for k, v in name_clusters.items()}) # Deduplicate entities across passes (avoid double-merge) seen_paths: set[str] = set() plans: list[tuple[str, dict, list[dict]]] = [] for ckey, cluster in clusters.items(): # Filter out already-seen cluster = [e for e in cluster if str(e["path"]) not in seen_paths] if len(cluster) < 2: continue canonical = choose_canonical(cluster) duplicates = [e for e in cluster if e is not canonical] for e in cluster: seen_paths.add(str(e["path"])) plans.append((ckey, canonical, duplicates)) plans.sort(key=lambda p: -len(p[2])) # biggest clusters first redundant_total = sum(len(d) for _, _, d in plans) print(f"\n=== Merge plan ===") print(f" clusters: {len(plans)}") print(f" entities removed: {redundant_total}") print(f" before: {len(all_entities)} → after: {len(all_entities) - redundant_total}") print(f" reduction: {100*redundant_total/len(all_entities):.1f}%\n") print("=== Top 20 biggest merges ===") for ckey, canonical, dupes in plans[:20]: cname = canonical["fm"].get("canonical_name", "?") print(f" {len(dupes)+1:>3} entities → '{cname}' ({ckey.split('::')[0]})") for d in dupes[:4]: print(f" ✗ {d['fm'].get('canonical_name', '?')}") if len(dupes) > 4: print(f" ... +{len(dupes)-4}") if args.dry_run: print("\n(dry-run; nothing written)") return 0 # Apply merges print("\nApplying merges ...") merged_count = 0 archived_count = 0 for ckey, canonical, dupes in plans: merge_into(canonical, dupes) canonical["path"].write_text(dump_entity(canonical), encoding="utf-8") merged_count += 1 for d in dupes: archive_to = archive_path(d["path"]) archive_to.parent.mkdir(parents=True, exist_ok=True) shutil.move(str(d["path"]), str(archive_to)) archived_count += 1 print(f" canonicals updated: {merged_count}") print(f" duplicates archived: {archived_count}") return 0 if __name__ == "__main__": sys.exit(main())