#!/usr/bin/env python3 """ 42_sync_entity_stats.py — Bulletproof sync of every entity's reverse-reference signals. Three independent signal sources exist for an entity. Until now the UI used only one of them and showed "0 menções" whenever the others disagreed. This script rebuilds them all in a single pass: 1. wiki_page_refs — pages whose entities_extracted[] lists this entity. Materialised back into the entity's mentioned_in[]. 2. db_chunk_mentions — count of rows in public.entity_mentions whose chunk_pk matches a chunk that textually contains the entity (ILIKE on canonical_name + aliases). Source of truth for chat / search retrieval. 3. cross_entity_refs — reverse-links discovered by traversing other entity YAMLs: an event's uap_objects[] / observers[] / organizations_involved[]; a location's events_here[]; a document's key_entities[]. After scanning, each entity's frontmatter is rewritten with: mentioned_in: [...] # the page refs (canonical, not generated noise) total_mentions: # max(db_chunk_mentions, len(mentioned_in)) documents_count: # distinct docs across both signals signal_sources: db_chunks: page_refs: cross_refs: signal_strength: strong | weak | orphan last_lint: When all three signals are zero the entity is moved to wiki/entities/_archived//.md and a one-line record is appended to wiki/log.md. Idempotent: re-running converges. Safe to interrupt — writes are atomic. """ from __future__ import annotations import argparse import json import os import re import shutil import sys import unicodedata from collections import defaultdict from datetime import datetime, timezone from pathlib import Path try: import yaml import psycopg except ImportError as e: sys.stderr.write(f"pip3 install pyyaml psycopg[binary] # missing: {e}\n") sys.exit(1) UFO_ROOT = Path(__file__).resolve().parents[2] ENTITIES_BASE = UFO_ROOT / "wiki" / "entities" ARCHIVED_BASE = UFO_ROOT / "wiki" / "entities" / "_archived" PAGES_BASE = UFO_ROOT / "wiki" / "pages" DOCS_BASE = UFO_ROOT / "wiki" / "documents" LOG_PATH = UFO_ROOT / "wiki" / "log.md" DATABASE_URL = os.getenv("DATABASE_URL") or os.getenv("SUPABASE_DB_URL") # Map plural folder names to the entity_class singular used in DB FOLDER_TO_CLASS = { "people": "person", "organizations": "organization", "locations": "location", "events": "event", "uap-objects": "uap_object", "vehicles": "vehicle", "operations": "operation", "concepts": "concept", } CLASS_TO_FOLDER = {v: k for k, v in FOLDER_TO_CLASS.items()} ID_FIELD_BY_CLASS = { "person": "person_id", "organization": "organization_id", "location": "location_id", "event": "event_id", "uap_object": "uap_object_id", "vehicle": "vehicle_id", "operation": "operation_id", "concept": "concept_id", } # Cross-entity fields that contain wikilinks pointing TO another entity. CROSS_REF_FIELDS = { "event": ["uap_objects", "observers", "organizations_involved", "vehicles_involved", "witnesses_analyses", "preceded_by", "followed_by", "related_events", "documented_in", "primary_location"], "location": ["events_here"], "uap_object": ["observed_in_event", "secondary_events"], "operation": ["documents"], "document": ["key_entities", "key_events"], } WIKILINK_RE = re.compile(r"\[\[([^\]|]+?)(?:\|[^\]]+)?\]\]") def canonicalize_name(name: str) -> str: """name → kebab-case ASCII-fold id (same algorithm as 03-dedup-entities.py).""" if not name: return "" nfkd = unicodedata.normalize("NFKD", str(name)) ascii_str = "".join(c for c in nfkd if not unicodedata.combining(c)) lower = ascii_str.lower() replaced = re.sub(r"[^a-z0-9-]", "-", lower) collapsed = re.sub(r"-+", "-", replaced).strip("-") if collapsed and collapsed[0].isdigit(): collapsed = "x-" + collapsed return collapsed def event_id_from_entry(entry: dict) -> str | None: """Same EV-YYYY-MM-DD-slug id rule as scripts/03-dedup-entities.py.""" label = entry.get("label") or entry.get("name") if not label: return None date = entry.get("date") or "NA" slug = canonicalize_name(label)[:40].strip("-") or "unlabeled" m = re.match(r"^(\d{4})-(\d{2})-(\d{2})$", str(date)) if m: return f"EV-{m.group(1)}-{m.group(2)}-{m.group(3)}-{slug}" m = re.match(r"^(\d{4})-(\d{2})$", str(date)) if m: return f"EV-{m.group(1)}-{m.group(2)}-XX-{slug}" m = re.match(r"^(\d{4})$", str(date)) if m: return f"EV-{m.group(1)}-XX-XX-{slug}" return f"EV-XXXX-XX-XX-{slug}" def uap_object_id_from_event(event_id: str, index: int) -> str: """OBJ-EV--, mirroring scripts/03 logic.""" if event_id and event_id.startswith("EV-"): rest = event_id[3:] parts = rest.split("-", 4) if len(parts) >= 4: year = parts[0] slug_part = "-".join(parts[3:]) if len(parts) > 3 else "unk" slug_compact = slug_part.replace("-", "").upper()[:20] or "UNK" event_short = f"EV{year}-{slug_compact}" else: event_short = "UNK" else: event_short = "UNK" return f"OBJ-{event_short}-{index:02d}" def utc_iso() -> str: return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") def read_md(path: Path) -> tuple[dict, str]: raw = path.read_text(encoding="utf-8") if not raw.startswith("---"): return {}, raw end = raw.find("---", 4) try: fm = yaml.safe_load(raw[3:end].strip()) or {} except yaml.YAMLError: return {}, raw body = raw[end + 3 :].lstrip("\n") return fm, body def write_md(path: Path, fm: dict, body: str) -> None: """Atomic write so we never leave a half-written YAML.""" yaml_str = yaml.dump(fm, allow_unicode=True, sort_keys=False, default_flow_style=False) sep = "" if body.startswith("\n") else "\n" tmp = path.with_suffix(path.suffix + ".tmp") tmp.write_text(f"---\n{yaml_str}---\n{sep}{body}", encoding="utf-8") tmp.replace(path) def parse_wikilink_target(s: str) -> tuple[str | None, str | None]: """[[class/id]] or [[event/id]] → (class, id). Returns (None, None) if not parseable.""" if not s or not isinstance(s, str): return None, None m = WIKILINK_RE.search(s) target = m.group(1).strip() if m else s.strip() if "/" not in target: return None, None parts = target.split("/", 1) prefix, ident = parts[0], parts[1] # accept singular ("event/...") or plural ("events/...") or class-name aliases = { "people": "person", "person": "person", "org": "organization", "organization": "organization", "organizations": "organization", "loc": "location", "location": "location", "locations": "location", "event": "event", "events": "event", "uap": "uap_object", "uap_object": "uap_object", "uap-objects": "uap_object", "vehicle": "vehicle", "vehicles": "vehicle", "op": "operation", "operation": "operation", "operations": "operation", "concept": "concept", "concepts": "concept", } cls = aliases.get(prefix.lower()) return (cls, ident.strip()) if cls else (None, None) def collect_page_refs() -> dict[tuple[str, str], set[str]]: """ Scan wiki/pages//p*.md. For each page, parse `entities_extracted: {people: [...], organizations: [...], ...}` and append the page_id to that entity's set. Returns {(class, id): {page_id, ...}}. """ refs: dict[tuple[str, str], set[str]] = defaultdict(set) for page_path in PAGES_BASE.rglob("p*.md"): try: fm, _ = read_md(page_path) except Exception: continue extracted = fm.get("entities_extracted") or {} if not isinstance(extracted, dict): continue # page_id like "doc-abc/p007" doc_id = page_path.parent.name page_id = f"{doc_id}/{page_path.stem}" # Compute the page's event_ids first — UAP objects on the same page # are linked to the FIRST event (mirrors scripts/03-dedup-entities.py). page_event_ids: list[str] = [] for entry in (extracted.get("events") or []): if isinstance(entry, dict): eid = event_id_from_entry(entry) if eid: page_event_ids.append(eid) refs[("event", eid)].add(page_id) # Then the OBJs, indexed in order, anchored to the first event. for idx, entry in enumerate((extracted.get("uap_objects") or []), start=1): event_for_obj = page_event_ids[0] if page_event_ids else None if not event_for_obj: # Same fallback script 03 uses when no event exists on the page. event_for_obj = f"EV-XXXX-XX-XX-{canonicalize_name(doc_id)[:30]}" obj_id = uap_object_id_from_event(event_for_obj, idx) refs[("uap_object", obj_id)].add(page_id) # Every other class is handled generically (name-based). for folder, entries in extracted.items(): cls = FOLDER_TO_CLASS.get(folder) if not cls or cls in {"event", "uap_object"} or not isinstance(entries, list): continue for entry in entries: eid = None if isinstance(entry, str): _, parsed_eid = parse_wikilink_target(entry) eid = parsed_eid or canonicalize_name(entry) elif isinstance(entry, dict): eid = (entry.get("id") or entry.get(ID_FIELD_BY_CLASS.get(cls, "id")) or canonicalize_name(entry.get("name", ""))) if eid: refs[(cls, eid)].add(page_id) if isinstance(entry, dict): for alias in (entry.get("aliases") or []): alias_id = canonicalize_name(alias) if alias_id and alias_id != eid: refs[(cls, alias_id)].add(page_id) return refs def collect_cross_refs() -> dict[tuple[str, str], set[str]]: """ Sweep entity YAMLs themselves. When entity X declares `uap_objects: [[[uap/OBJ-...]]]`, we register OBJ-... → X as a cross-ref. """ refs: dict[tuple[str, str], set[str]] = defaultdict(set) for folder, cls in FOLDER_TO_CLASS.items(): cls_dir = ENTITIES_BASE / folder if not cls_dir.is_dir(): continue for ent_path in cls_dir.glob("*.md"): try: fm, _ = read_md(ent_path) except Exception: continue id_field = ID_FIELD_BY_CLASS.get(cls) self_id = fm.get(id_field) or ent_path.stem for field in CROSS_REF_FIELDS.get(cls, []): val = fm.get(field) items = val if isinstance(val, list) else ([val] if val else []) for item in items: tgt_cls, tgt_id = parse_wikilink_target(item if isinstance(item, str) else str(item)) if tgt_cls and tgt_id: refs[(tgt_cls, tgt_id)].add(f"{cls}/{self_id}") # Also walk documents/key_entities for doc_path in DOCS_BASE.glob("*.md"): try: fm, _ = read_md(doc_path) except Exception: continue for item in (fm.get("key_entities") or []): tgt_cls, tgt_id = parse_wikilink_target(item if isinstance(item, str) else str(item)) if tgt_cls and tgt_id: refs[(tgt_cls, tgt_id)].add(f"document/{doc_path.stem}") return refs def collect_db_mentions(conn) -> dict[tuple[str, str], tuple[int, int]]: """Return {(class, id): (chunk_count, doc_count)} from public.entity_mentions.""" out: dict[tuple[str, str], tuple[int, int]] = {} with conn.cursor() as cur: cur.execute( """ SELECT e.entity_class, e.entity_id, COUNT(em.chunk_pk)::int AS chunks, COUNT(DISTINCT c.doc_id)::int AS docs FROM public.entities e LEFT JOIN public.entity_mentions em ON em.entity_pk = e.entity_pk LEFT JOIN public.chunks c ON c.chunk_pk = em.chunk_pk GROUP BY e.entity_class, e.entity_id """ ) for cls, eid, chunks, docs in cur.fetchall(): out[(cls, eid)] = (chunks or 0, docs or 0) return out def signal_strength(db_chunks: int, page_refs: int, cross_refs: int) -> str: total = db_chunks + page_refs + cross_refs if total == 0: return "orphan" if db_chunks >= 3 or page_refs >= 3 or (db_chunks >= 1 and page_refs >= 1): return "strong" return "weak" def archive_entity(path: Path, dry_run: bool, archived_count: list[int]) -> None: rel = path.relative_to(ENTITIES_BASE) target = ARCHIVED_BASE / rel archived_count[0] += 1 if dry_run: return target.parent.mkdir(parents=True, exist_ok=True) shutil.move(str(path), str(target)) def main() -> int: p = argparse.ArgumentParser() p.add_argument("--dry-run", action="store_true") p.add_argument("--archive", action="store_true", help="actually move orphans to wiki/entities/_archived/. " "By default we only mark them — data is never lost.") p.add_argument("--archive-only-junk", action="store_true", help="archive ONLY entities whose canonical_name is <=3 chars, " "purely numeric, or matches obvious junk patterns") p.add_argument("--fix-obj-names", action="store_true", help="rewrite OBJ-* canonical_name to ' UAP', " "moving the full shape description to aliases") p.add_argument("--verbose", action="store_true") args = p.parse_args() print(f"scanning {ENTITIES_BASE} ...") if not DATABASE_URL: sys.stderr.write("DATABASE_URL not set — cannot read DB mentions\n") return 1 print("collecting page refs from wiki/pages/ ...") page_refs = collect_page_refs() print(f" {len(page_refs)} entities referenced from {sum(len(v) for v in page_refs.values())} page rows") print("collecting cross-entity refs ...") cross_refs = collect_cross_refs() print(f" {len(cross_refs)} entities back-linked") print(f"reading DB entity_mentions ...") with psycopg.connect(DATABASE_URL) as conn: db_counts = collect_db_mentions(conn) print(f" {len(db_counts)} entities in DB") # Walk every entity YAML on disk archived_count = [0] stats = {"strong": 0, "weak": 0, "orphan": 0, "updated": 0, "skipped": 0} for folder, cls in FOLDER_TO_CLASS.items(): cls_dir = ENTITIES_BASE / folder if not cls_dir.is_dir(): continue for ent_path in cls_dir.glob("*.md"): try: fm, body = read_md(ent_path) except Exception: stats["skipped"] += 1 continue if not fm: stats["skipped"] += 1 continue id_field = ID_FIELD_BY_CLASS.get(cls) eid = fm.get(id_field) or ent_path.stem key = (cls, eid) db_chunks, db_docs = db_counts.get(key, (0, 0)) page_list = sorted(page_refs.get(key, set())) cross_list = sorted(cross_refs.get(key, set())) # Also count this entity's OWN outgoing wikilinks as signal — # if an OBJ has observed_in_event pointing to a real event, the # OBJ is anchored even when no one links back to it. own_outgoing: set[str] = set() for field in CROSS_REF_FIELDS.get(cls, []): val = fm.get(field) items = val if isinstance(val, list) else ([val] if val else []) for item in items: tgt_cls, tgt_id = parse_wikilink_target( item if isinstance(item, str) else str(item)) if tgt_cls and tgt_id: own_outgoing.add(f"{tgt_cls}/{tgt_id}") all_cross = sorted(set(cross_list) | own_outgoing) strength = signal_strength(db_chunks, len(page_list), len(all_cross)) stats[strength] += 1 # Optional: clean up OBJ entities whose canonical_name is a shape # description plus the ID in parentheses. Move the description to # an alias and pick a short readable name from the linked event. if args.fix_obj_names and cls == "uap_object": cn = str(fm.get("canonical_name") or "") # Match any OBJ name that embeds the raw ID in parens — that's # the unmistakable Sonnet-generated pattern we want to clean up. if "UAP (OBJ-" in cn and cn.endswith(")"): obs_event = fm.get("observed_in_event") event_cls, event_id = parse_wikilink_target(obs_event or "") if event_cls == "event" and event_id: # Strip the "EV-YYYY-MM-DD-" prefix to get a slug slug = re.sub(r"^EV-\d{4}-[\dX]{2}-[\dX]{2}-", "", event_id) new_name = slug.replace("-", " ").strip() or eid new_name = new_name[:1].upper() + new_name[1:] + " UAP" aliases = list(fm.get("aliases") or []) if cn not in aliases: aliases.insert(0, cn) fm["canonical_name"] = new_name fm["aliases"] = aliases # Mutate frontmatter — preserve unrelated keys. fm["mentioned_in"] = [f"[[{p}]]" for p in page_list] fm["total_mentions"] = max(db_chunks, len(page_list)) fm["documents_count"] = max(db_docs, len({p.split("/", 1)[0] for p in page_list})) fm["signal_sources"] = { "db_chunks": int(db_chunks), "page_refs": len(page_list), "cross_refs": len(all_cross), } if all_cross: fm["referenced_by"] = [f"[[{r}]]" for r in all_cross[:25]] elif "referenced_by" in fm: del fm["referenced_by"] fm["signal_strength"] = strength fm["last_lint"] = utc_iso() # Optional archive paths — by default we KEEP everything, only mark. if strength == "orphan" and args.archive: archive_entity(ent_path, args.dry_run, archived_count) continue if args.archive_only_junk: cn = str(fm.get("canonical_name") or "").strip() cn_id = cn.lower() is_junk = ( len(cn) <= 3 or re.fullmatch(r"[0-9.()-]+", cn) is not None or cn_id in {"unknown", "none", "n/a", "na", "-", "—"} ) if is_junk and strength == "orphan": archive_entity(ent_path, args.dry_run, archived_count) continue stats["updated"] += 1 if args.verbose: print(f" {strength:7} {cls}/{eid} db={db_chunks} pages={len(page_list)} cross={len(cross_list)}") if not args.dry_run: write_md(ent_path, fm, body) print() print(f" strong: {stats['strong']:>6}") print(f" weak: {stats['weak']:>6}") print(f" orphan: {stats['orphan']:>6} (archived: {archived_count[0]})") print(f" updated: {stats['updated']:>6}") print(f" skipped: {stats['skipped']:>6}") print(f" dry-run: {args.dry_run}") if not args.dry_run and (stats["updated"] > 0 or archived_count[0] > 0): LOG_PATH.parent.mkdir(parents=True, exist_ok=True) with LOG_PATH.open("a", encoding="utf-8") as f: f.write( f"\n## {utc_iso()} · SYNC_ENTITY_STATS\n" f"- script: scripts/maintain/42_sync_entity_stats.py\n" f"- strong: {stats['strong']}\n" f"- weak: {stats['weak']}\n" f"- orphan: {stats['orphan']} (archived: {archived_count[0]})\n" f"- updated: {stats['updated']}\n" ) return 0 if __name__ == "__main__": sys.exit(main())