#!/usr/bin/env python3 """ 57_load_relations_from_json.py — Build typed relations for public.relations from the reextract data, using ONLY verifiable references (Locard / absolute provenance — no fuzzy guessing). Two sources, combined and deduped: A. STRUCTURAL relations derived from each raw/--subagent/_reextract.json events[] (deterministic event_id, names resolved against real entities): - event.observers[] → (person, witnessed, event) - event (each) → (event, documented_in, document) - event.uap_objects_observed → (event, involves_uap, uap_object) - event.primary_location → (event, occurred_at, location) [substring match against this doc's locations[]] - people[] (each) → (person, mentioned_by, document) B. EXPLICIT relations[] from the same JSON that resolve EXACTLY (both endpoints found in the entity name→id index): captures person↔org (employed_by, signed, authored, commanded), etc. ID generation mirrors scripts/synthesize/30_rebuild_wiki_from_reextract.py so event_id / person_id / uap_object_id match the entities table exactly. Run (DATABASE_URL must point at target Postgres): DATABASE_URL=postgresql://... python3 scripts/maintain/57_load_relations_from_json.py [--truncate] """ from __future__ import annotations import json import os import re import sys import unicodedata from pathlib import Path import psycopg import yaml UFO = Path(os.environ.get("UFO_ROOT", "/Users/guto/ufo")) RAW = UFO / "raw" ENT = UFO / "wiki" / "entities" CLASS_DIR = { "person": "people", "organization": "organizations", "location": "locations", "event": "events", "uap_object": "uap-objects", } # ── ID generation (mirror of synthesize/30) ───────────────────────────────── def canonicalize_name(name: str) -> str: if not name: return "" nfd = unicodedata.normalize("NFD", name) ascii_str = "".join(c for c in nfd if not unicodedata.combining(c)) low = ascii_str.lower() rep = re.sub(r"[^a-z0-9-]", "-", low) col = re.sub(r"-+", "-", rep).strip("-") if col and col[0].isdigit(): col = "x-" + col return col def event_id_from(label: str, date_start: str | None) -> str: slug = canonicalize_name(label or "")[:40].strip("-") or "unlabeled" date = date_start or "" m = re.match(r"^(\d{4})-(\d{2})-(\d{2})$", date) if m: return f"EV-{m.group(1)}-{m.group(2)}-{m.group(3)}-{slug}" m = re.match(r"^(\d{4})-(\d{2})$", date) if m: return f"EV-{m.group(1)}-{m.group(2)}-XX-{slug}" m = re.match(r"^(\d{4})$", date) if m: return f"EV-{m.group(1)}-XX-XX-{slug}" return f"EV-XXXX-XX-XX-{slug}" def uap_object_id(event_id: str, index: int) -> str: if event_id.startswith("EV-"): parts = event_id[3:].split("-", 4) if len(parts) >= 4: year = parts[0] slug = "-".join(parts[3:]) compact = re.sub(r"[^A-Z0-9]", "", slug.upper())[:20] or "UNK" return f"OBJ-EV{year}-{compact}-{index:02d}" return f"OBJ-UNK-{index:02d}" def lower(s: str) -> str: return (s or "").strip().lower() def parse_frontmatter(path: Path) -> dict | None: try: text = path.read_text(encoding="utf-8") if not text.startswith("---"): return None return yaml.safe_load(text.split("---", 2)[1]) or {} except Exception: return None def build_name_index() -> dict[str, dict[str, str]]: """Per class: {name_or_alias_lower: entity_id} from real entity files.""" index: dict[str, dict[str, str]] = {c: {} for c in CLASS_DIR} for cls, dirname in CLASS_DIR.items(): d = ENT / dirname if not d.is_dir(): continue for f in d.glob("*.md"): eid = f.stem fm = parse_frontmatter(f) if not fm: index[cls].setdefault(eid, eid) continue for n in [fm.get("canonical_name")] + (fm.get("aliases") or []): k = lower(n) if k and k not in index[cls]: index[cls][k] = eid return index def entity_id_sets(index) -> dict[str, set]: return {cls: set(m.values()) for cls, m in index.items()} def main() -> int: truncate = "--truncate" in sys.argv dburl = os.environ.get("DATABASE_URL") or os.environ.get("SUPABASE_DB_URL") if not dburl: sys.exit("DATABASE_URL not set") print("Building name→id index from wiki/entities ...") index = build_name_index() ids = entity_id_sets(index) for cls in CLASS_DIR: print(f" {cls}: {len(index[cls])} keys / {len(ids[cls])} ids") rows: list[tuple] = [] def add(sc, sid, rtype, tc, tid, doc_id, conf): if not (sid and tid): return ev = f"[[{doc_id}]]" if doc_id else None rows.append((sc, sid, rtype, tc, tid, ev, conf, "reextract")) def resolve(cls, name): if cls == "document": return (name or "").strip() or None return index.get(cls, {}).get(lower(name)) n_docs = 0 for jf in sorted(RAW.glob("*--subagent/_reextract.json")): doc_id = jf.parent.name.removesuffix("--subagent") try: d = json.loads(jf.read_text(encoding="utf-8")) except Exception: continue n_docs += 1 # locations declared in this doc (clean names) → for substring match doc_locs = [] for l in d.get("locations") or []: nm = (l.get("name") or "").strip() lid = canonicalize_name(nm) if nm and lid in ids["location"]: doc_locs.append((nm.lower(), lid)) # longest names first (more specific match) doc_locs.sort(key=lambda x: -len(x[0])) # A. structural from events[] for e in d.get("events") or []: label = (e.get("label") or "").strip() if not label: continue eid = event_id_from(label, e.get("date_start")) if eid not in ids["event"]: continue # event entity must exist conf = e.get("confidence") or "medium" # event documented_in document add("event", eid, "documented_in", "document", doc_id, doc_id, "high") # observers witnessed event for o in e.get("observers") or []: nm = o.get("name") if isinstance(o, dict) else o if nm and lower(nm) != "unknown": pid = index["person"].get(lower(nm)) or ( canonicalize_name(nm) if canonicalize_name(nm) in ids["person"] else None ) if pid: add("person", pid, "witnessed", "event", eid, doc_id, conf) # uap_objects involves_uap for i, u in enumerate(e.get("uap_objects_observed") or [], 1): if not isinstance(u, dict): continue oid = uap_object_id(eid, i) if oid in ids["uap_object"]: add("event", eid, "involves_uap", "uap_object", oid, doc_id, conf) # event occurred_at location (substring match of doc locations) ploc = lower(e.get("primary_location_name")) if ploc: for lname, lid in doc_locs: if lname and lname in ploc: add("event", eid, "occurred_at", "location", lid, doc_id, "medium") break # people mentioned_by document for p in d.get("people") or []: nm = (p.get("name") or "").strip() if nm and lower(nm) != "unknown": pid = index["person"].get(lower(nm)) if pid: add("person", pid, "mentioned_by", "document", doc_id, doc_id, "medium") # B. explicit relations[] that resolve exactly for r in d.get("relations") or []: if not isinstance(r, dict): continue sc, tc, rtype = r.get("source_class"), r.get("target_class"), r.get("type") if not (sc and tc and rtype): continue # skip the structural types already covered to avoid noise dup sid = resolve(sc, r.get("source_name")) tid = resolve(tc, r.get("target_name")) if sid and tid: add(sc, sid, rtype, tc, tid, doc_id, r.get("confidence") or "medium") print(f"\nProcessed {n_docs} docs; raw relation rows: {len(rows)}") # dedupe by (source, type, target) — keep first (evidence may vary) seen: set[tuple] = set() deduped: list[tuple] = [] for row in rows: key = (row[0], row[1], row[2], row[3], row[4]) if key in seen: continue seen.add(key) deduped.append(row) print(f"After dedup: {len(deduped)}") if not deduped: return 0 with psycopg.connect(dburl) as conn: with conn.cursor() as cur: if truncate: cur.execute("TRUNCATE public.relations") print(" TRUNCATEd public.relations") cur.execute("CREATE TEMP TABLE _rel (LIKE public.relations INCLUDING DEFAULTS)") with cur.copy( """COPY _rel (source_class, source_id, relation_type, target_class, target_id, evidence_ref, confidence, extracted_by) FROM STDIN""" ) as cp: for row in deduped: cp.write_row(row) cur.execute( """INSERT INTO public.relations (source_class, source_id, relation_type, target_class, target_id, evidence_ref, confidence, extracted_by) SELECT source_class, source_id, relation_type, target_class, target_id, evidence_ref, confidence, extracted_by FROM _rel WHERE relation_type IN ('witnessed','occurred_at','involves_uap', 'documented_in','authored','signed', 'mentioned_by','employed_by','operated_by', 'investigated','commanded','related_to', 'similar_to','precedes','follows') ON CONFLICT DO NOTHING""" ) print(f"Inserted (after ON CONFLICT + type filter): {cur.rowcount}") cur.execute( "SELECT relation_type, COUNT(*) FROM _rel WHERE relation_type NOT IN " "('witnessed','occurred_at','involves_uap','documented_in','authored','signed'," "'mentioned_by','employed_by','operated_by','investigated','commanded'," "'related_to','similar_to','precedes','follows') GROUP BY relation_type ORDER BY 2 DESC" ) drops = cur.fetchall() if drops: print("Dropped (invalid relation_type):") for t, n in drops: print(f" {n:>5} {t}") cur.execute( "SELECT relation_type, COUNT(*) FROM public.relations GROUP BY relation_type ORDER BY 2 DESC" ) print("\n=== Relation counts in DB ===") for t, n in cur.fetchall(): print(f" {n:>7} {t}") conn.commit() return 0 if __name__ == "__main__": sys.exit(main())