#!/usr/bin/env python3 """ 18-build-entity-index.py — Fase 7.5 — Pre-process entity↔OCR matches Per page, scan its OCR text against the full alias index of all 14k+ entities. Produces `wiki/pages//p.matches.json` with: [ {"entity_id": "j-edgar-hoover", "class": "people", "alias_matched": "Hoover", "start": 423, "end": 429} ] The frontend uses these to highlight entity mentions inline in the OCR text and open a modal on click (no runtime string matching). Performance: - Builds one big regex with alternation (longest-aliases-first) per class. - Word boundaries enforced. - ~10ms per page on the 14k alias index. Idempotent. Run after `03-dedup-entities.py`. Re-run when entities change. Usage: ./18-build-entity-index.py # all pages ./18-build-entity-index.py --doc-id """ from __future__ import annotations import argparse import json import re import sys import unicodedata from pathlib import Path try: import yaml except ImportError: sys.stderr.write("pip3 install pyyaml\n") sys.exit(1) UFO_ROOT = Path("/Users/guto/ufo") WIKI = UFO_ROOT / "wiki" PAGES = WIKI / "pages" ENTITIES = WIKI / "entities" OCR_BASE = UFO_ROOT / "processing" / "ocr" # Folder name → class key used by the frontend CLASS_FOLDERS = { "people": "people", "organizations": "organizations", "locations": "locations", "events": "events", "uap-objects": "uap-objects", "vehicles": "vehicles", "operations": "operations", "concepts": "concepts", } # Aliases shorter than this are skipped (too many false positives on common words) MIN_ALIAS_LEN = 3 # Stop-aliases — common nouns extracted as entities by the vision pass that # would generate runaway matches. STOP_ALIASES = { "the", "and", "for", "with", "from", "this", "that", "have", "has", "had", "they", "them", "their", "his", "her", "him", "she", "you", "your", "page", "report", "document", "subject", "date", "time", "file", "case", "memo", "letter", "office", "section", "general", "agent", "info", "see", "ref", "via", "etc", "inc", "ltd", "the bureau", "the agency", "the department", "the office", "the file", "the case", "the report", "yes", "no", "ok", "etc.", "i.e.", "e.g.", } def normalize(s: str) -> str: nfd = unicodedata.normalize("NFD", s) return "".join(c for c in nfd if not unicodedata.combining(c)).lower() def read_md_fm(path: Path) -> dict: try: c = path.read_text(encoding="utf-8") except FileNotFoundError: return {} if not c.startswith("---"): return {} end = c.find("---", 4) if end == -1: return {} try: return yaml.safe_load(c[3:end].strip()) or {} except yaml.YAMLError: return {} def collect_aliases() -> list[tuple[str, str, str, str]]: """Returns list of (alias_normalized_lower, alias_original, class, entity_id).""" rows: list[tuple[str, str, str, str]] = [] for folder, cls in CLASS_FOLDERS.items(): d = ENTITIES / folder if not d.exists(): continue for f in d.glob("*.md"): entity_id = f.stem fm = read_md_fm(f) if not fm: continue names = set() cname = fm.get("canonical_name") if isinstance(cname, str) and cname.strip(): names.add(cname.strip()) for a in (fm.get("aliases") or []): if isinstance(a, str) and a.strip(): names.add(a.strip()) for n in names: if len(n) < MIN_ALIAS_LEN: continue norm = normalize(n) if norm in STOP_ALIASES: continue if not re.search(r"[a-z]", norm): continue rows.append((norm, n, cls, entity_id)) return rows def build_megaregex(aliases: list[tuple[str, str, str, str]]) -> tuple[re.Pattern, list[tuple[str, str, str]]]: """Build one big regex with alternation, longest-first. Returns (compiled_pattern, payload_table) where payload[i] = (alias_original, class, entity_id). """ # Sort by length DESC so the longest alias wins on overlap sorted_aliases = sorted(aliases, key=lambda r: -len(r[0])) parts: list[str] = [] payload: list[tuple[str, str, str]] = [] seen: set[str] = set() for norm, orig, cls, eid in sorted_aliases: if norm in seen: continue seen.add(norm) # Escape regex specials in the normalized alias; word-boundary on both sides parts.append(re.escape(norm)) payload.append((orig, cls, eid)) big = r"\b(?:" + "|".join(parts) + r")\b" pat = re.compile(big, re.IGNORECASE) return pat, payload def match_page_text(ocr_text: str, pat: re.Pattern, payload: list[tuple[str, str, str]], alias_to_idx: dict[str, int]) -> list[dict]: """Return list of match dicts. OCR is matched on lower+ASCII-folded text BUT we record start/end against the ORIGINAL OCR string so the frontend can slice the original (with accents, punctuation) correctly. Strategy: build a char-index map normalized→original. Since NFD ASCII-fold can change length (rare; mostly preserves), we use a simpler approach: match on a 1:1 lower-cased version of the OCR (preserving length) and a separate normalized lowercased OCR for searching, then map indices back. To keep it simple AND correct, just match against `ocr_text.lower()` — accents are preserved, and `re.IGNORECASE` already handles case. The normalize() above only matters for de-duplicating alias keys; the regex itself matches ascii→ascii via the escape() applied to normalized strings, which is fine because most OCR text is ASCII. """ matches: list[dict] = [] # We compile patterns from normalized lowercased aliases. To match correctly we run the # regex on a normalized lowercased OCR view, then map back to original indices using the # length-preservation property of unicode lower() + NFD fold for typical Latin-1 chars. # For simplicity: match on the ASCII-folded lowercased OCR and assume same length. nfd = unicodedata.normalize("NFD", ocr_text) # Length is preserved if we drop combining marks AND record orig positions per non-combining char orig_idx: list[int] = [] folded_chars: list[str] = [] for i, c in enumerate(nfd): if unicodedata.combining(c): continue folded_chars.append(c.lower()) # Map this folded char back to OCR position: walk original OCR # ^ For accuracy, recompute via per-char NFD inverse — simpler approach below folded = "".join(folded_chars) # Build mapping: position in `folded` → position in ocr_text # ocr_text → NFD → drop combining → folded. Each kept char corresponds to one source char in # ocr_text (the base char that produced it after NFD). We walk ocr_text and count. ocr_to_folded: list[int] = [] # ocr_to_folded[i] = folded position for ocr_text[i] (or last seen) folded_to_ocr: list[int] = [] # folded_to_ocr[k] = ocr_text position for folded[k] fi = 0 for i, ch in enumerate(ocr_text): nfd_ch = unicodedata.normalize("NFD", ch) kept = [c for c in nfd_ch if not unicodedata.combining(c)] if kept: folded_to_ocr.append(i) fi += 1 ocr_to_folded.append(fi - 1) for m in pat.finditer(folded): start_f, end_f = m.start(), m.end() if start_f >= len(folded_to_ocr) or end_f - 1 >= len(folded_to_ocr): continue start_o = folded_to_ocr[start_f] end_o = folded_to_ocr[end_f - 1] + 1 text = m.group(0) idx = alias_to_idx.get(text) if idx is None: continue orig, cls, eid = payload[idx] matches.append({ "entity_id": eid, "class": cls, "alias_matched": ocr_text[start_o:end_o], "start": start_o, "end": end_o, }) return matches def process_page(doc_dir: Path, page_md: Path, pat: re.Pattern, payload: list[tuple[str, str, str]], alias_to_idx: dict[str, int], force: bool) -> bool: doc_id = doc_dir.name stem = page_md.stem # e.g., "p007" m = re.match(r"p(\d+)", stem) if not m: return False page_num = int(m.group(1)) out = doc_dir / f"{stem}.matches.json" if out.exists() and not force: return False padded = f"{page_num:03d}" ocr_path = OCR_BASE / doc_id / f"p-{padded}.txt" try: ocr_text = ocr_path.read_text(encoding="utf-8") except FileNotFoundError: return False matches = match_page_text(ocr_text, pat, payload, alias_to_idx) out.write_text(json.dumps(matches, ensure_ascii=False), encoding="utf-8") return True def main(): ap = argparse.ArgumentParser() ap.add_argument("--doc-id", help="single doc") ap.add_argument("--force", action="store_true") args = ap.parse_args() print("Collecting aliases from all entities...", flush=True) aliases = collect_aliases() print(f" {len(aliases)} (alias, entity) pairs", flush=True) print("Building mega-regex...", flush=True) pat, payload = build_megaregex(aliases) alias_to_idx = {norm: i for i, (orig, cls, eid) in enumerate(payload) for norm in [orig.lower()]} # ^ but pat matches with IGNORECASE on the folded text, so we need normalized→idx # Rebuild correctly: walk payload, derive normalized form alias_to_idx = {} for i, (orig, cls, eid) in enumerate(payload): norm = normalize(orig) alias_to_idx[norm] = i print(f" pattern has {len(payload)} unique aliases", flush=True) docs = [PAGES / args.doc_id] if args.doc_id else sorted(d for d in PAGES.iterdir() if d.is_dir()) total_written = 0 total_pages = 0 for doc_dir in docs: for page_md in sorted(doc_dir.glob("p*.md")): total_pages += 1 if process_page(doc_dir, page_md, pat, payload, alias_to_idx, args.force): total_written += 1 print(f"\nDone: {total_written} matches.json (re)written across {total_pages} pages", flush=True) if __name__ == "__main__": main()