disclosure-bureau/scripts/18-build-entity-index.py

273 lines
10 KiB
Python
Executable file

#!/usr/bin/env python3
"""
18-build-entity-index.py — Fase 7.5 — Pre-process entity↔OCR matches
Per page, scan its OCR text against the full alias index of all 14k+
entities. Produces `wiki/pages/<doc-id>/p<NNN>.matches.json` with:
[
{"entity_id": "j-edgar-hoover", "class": "people",
"alias_matched": "Hoover", "start": 423, "end": 429}
]
The frontend uses these to highlight entity mentions inline in the OCR text
and open a modal on click (no runtime string matching).
Performance:
- Builds one big regex with alternation (longest-aliases-first) per class.
- Word boundaries enforced.
- ~10ms per page on the 14k alias index.
Idempotent. Run after `03-dedup-entities.py`. Re-run when entities change.
Usage:
./18-build-entity-index.py # all pages
./18-build-entity-index.py --doc-id <id>
"""
from __future__ import annotations
import argparse
import json
import re
import sys
import unicodedata
from pathlib import Path
try:
import yaml
except ImportError:
sys.stderr.write("pip3 install pyyaml\n")
sys.exit(1)
UFO_ROOT = Path("/Users/guto/ufo")
WIKI = UFO_ROOT / "wiki"
PAGES = WIKI / "pages"
ENTITIES = WIKI / "entities"
OCR_BASE = UFO_ROOT / "processing" / "ocr"
# Folder name → class key used by the frontend
CLASS_FOLDERS = {
"people": "people",
"organizations": "organizations",
"locations": "locations",
"events": "events",
"uap-objects": "uap-objects",
"vehicles": "vehicles",
"operations": "operations",
"concepts": "concepts",
}
# Aliases shorter than this are skipped (too many false positives on common words)
MIN_ALIAS_LEN = 3
# Stop-aliases — common nouns extracted as entities by the vision pass that
# would generate runaway matches.
STOP_ALIASES = {
"the", "and", "for", "with", "from", "this", "that", "have", "has", "had",
"they", "them", "their", "his", "her", "him", "she", "you", "your",
"page", "report", "document", "subject", "date", "time", "file", "case",
"memo", "letter", "office", "section", "general", "agent", "info",
"see", "ref", "via", "etc", "inc", "ltd", "the bureau", "the agency",
"the department", "the office", "the file", "the case", "the report",
"yes", "no", "ok", "etc.", "i.e.", "e.g.",
}
def normalize(s: str) -> str:
nfd = unicodedata.normalize("NFD", s)
return "".join(c for c in nfd if not unicodedata.combining(c)).lower()
def read_md_fm(path: Path) -> dict:
try:
c = path.read_text(encoding="utf-8")
except FileNotFoundError:
return {}
if not c.startswith("---"):
return {}
end = c.find("---", 4)
if end == -1:
return {}
try:
return yaml.safe_load(c[3:end].strip()) or {}
except yaml.YAMLError:
return {}
def collect_aliases() -> list[tuple[str, str, str, str]]:
"""Returns list of (alias_normalized_lower, alias_original, class, entity_id)."""
rows: list[tuple[str, str, str, str]] = []
for folder, cls in CLASS_FOLDERS.items():
d = ENTITIES / folder
if not d.exists():
continue
for f in d.glob("*.md"):
entity_id = f.stem
fm = read_md_fm(f)
if not fm:
continue
names = set()
cname = fm.get("canonical_name")
if isinstance(cname, str) and cname.strip():
names.add(cname.strip())
for a in (fm.get("aliases") or []):
if isinstance(a, str) and a.strip():
names.add(a.strip())
for n in names:
if len(n) < MIN_ALIAS_LEN:
continue
norm = normalize(n)
if norm in STOP_ALIASES:
continue
if not re.search(r"[a-z]", norm):
continue
rows.append((norm, n, cls, entity_id))
return rows
def build_megaregex(aliases: list[tuple[str, str, str, str]]) -> tuple[re.Pattern, list[tuple[str, str, str]]]:
"""Build one big regex with alternation, longest-first.
Returns (compiled_pattern, payload_table) where payload[i] = (alias_original, class, entity_id).
"""
# Sort by length DESC so the longest alias wins on overlap
sorted_aliases = sorted(aliases, key=lambda r: -len(r[0]))
parts: list[str] = []
payload: list[tuple[str, str, str]] = []
seen: set[str] = set()
for norm, orig, cls, eid in sorted_aliases:
if norm in seen:
continue
seen.add(norm)
# Escape regex specials in the normalized alias; word-boundary on both sides
parts.append(re.escape(norm))
payload.append((orig, cls, eid))
big = r"\b(?:" + "|".join(parts) + r")\b"
pat = re.compile(big, re.IGNORECASE)
return pat, payload
def match_page_text(ocr_text: str, pat: re.Pattern, payload: list[tuple[str, str, str]],
alias_to_idx: dict[str, int]) -> list[dict]:
"""Return list of match dicts.
OCR is matched on lower+ASCII-folded text BUT we record start/end against the ORIGINAL OCR
string so the frontend can slice the original (with accents, punctuation) correctly.
Strategy: build a char-index map normalized→original. Since NFD ASCII-fold can change length
(rare; mostly preserves), we use a simpler approach: match on a 1:1 lower-cased version of
the OCR (preserving length) and a separate normalized lowercased OCR for searching, then
map indices back. To keep it simple AND correct, just match against `ocr_text.lower()` —
accents are preserved, and `re.IGNORECASE` already handles case. The normalize() above
only matters for de-duplicating alias keys; the regex itself matches ascii→ascii via the
escape() applied to normalized strings, which is fine because most OCR text is ASCII.
"""
matches: list[dict] = []
# We compile patterns from normalized lowercased aliases. To match correctly we run the
# regex on a normalized lowercased OCR view, then map back to original indices using the
# length-preservation property of unicode lower() + NFD fold for typical Latin-1 chars.
# For simplicity: match on the ASCII-folded lowercased OCR and assume same length.
nfd = unicodedata.normalize("NFD", ocr_text)
# Length is preserved if we drop combining marks AND record orig positions per non-combining char
orig_idx: list[int] = []
folded_chars: list[str] = []
for i, c in enumerate(nfd):
if unicodedata.combining(c):
continue
folded_chars.append(c.lower())
# Map this folded char back to OCR position: walk original OCR
# ^ For accuracy, recompute via per-char NFD inverse — simpler approach below
folded = "".join(folded_chars)
# Build mapping: position in `folded` → position in ocr_text
# ocr_text → NFD → drop combining → folded. Each kept char corresponds to one source char in
# ocr_text (the base char that produced it after NFD). We walk ocr_text and count.
ocr_to_folded: list[int] = [] # ocr_to_folded[i] = folded position for ocr_text[i] (or last seen)
folded_to_ocr: list[int] = [] # folded_to_ocr[k] = ocr_text position for folded[k]
fi = 0
for i, ch in enumerate(ocr_text):
nfd_ch = unicodedata.normalize("NFD", ch)
kept = [c for c in nfd_ch if not unicodedata.combining(c)]
if kept:
folded_to_ocr.append(i)
fi += 1
ocr_to_folded.append(fi - 1)
for m in pat.finditer(folded):
start_f, end_f = m.start(), m.end()
if start_f >= len(folded_to_ocr) or end_f - 1 >= len(folded_to_ocr):
continue
start_o = folded_to_ocr[start_f]
end_o = folded_to_ocr[end_f - 1] + 1
text = m.group(0)
idx = alias_to_idx.get(text)
if idx is None:
continue
orig, cls, eid = payload[idx]
matches.append({
"entity_id": eid,
"class": cls,
"alias_matched": ocr_text[start_o:end_o],
"start": start_o,
"end": end_o,
})
return matches
def process_page(doc_dir: Path, page_md: Path, pat: re.Pattern, payload: list[tuple[str, str, str]],
alias_to_idx: dict[str, int], force: bool) -> bool:
doc_id = doc_dir.name
stem = page_md.stem # e.g., "p007"
m = re.match(r"p(\d+)", stem)
if not m:
return False
page_num = int(m.group(1))
out = doc_dir / f"{stem}.matches.json"
if out.exists() and not force:
return False
padded = f"{page_num:03d}"
ocr_path = OCR_BASE / doc_id / f"p-{padded}.txt"
try:
ocr_text = ocr_path.read_text(encoding="utf-8")
except FileNotFoundError:
return False
matches = match_page_text(ocr_text, pat, payload, alias_to_idx)
out.write_text(json.dumps(matches, ensure_ascii=False), encoding="utf-8")
return True
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--doc-id", help="single doc")
ap.add_argument("--force", action="store_true")
args = ap.parse_args()
print("Collecting aliases from all entities...", flush=True)
aliases = collect_aliases()
print(f" {len(aliases)} (alias, entity) pairs", flush=True)
print("Building mega-regex...", flush=True)
pat, payload = build_megaregex(aliases)
alias_to_idx = {norm: i for i, (orig, cls, eid) in enumerate(payload)
for norm in [orig.lower()]}
# ^ but pat matches with IGNORECASE on the folded text, so we need normalized→idx
# Rebuild correctly: walk payload, derive normalized form
alias_to_idx = {}
for i, (orig, cls, eid) in enumerate(payload):
norm = normalize(orig)
alias_to_idx[norm] = i
print(f" pattern has {len(payload)} unique aliases", flush=True)
docs = [PAGES / args.doc_id] if args.doc_id else sorted(d for d in PAGES.iterdir() if d.is_dir())
total_written = 0
total_pages = 0
for doc_dir in docs:
for page_md in sorted(doc_dir.glob("p*.md")):
total_pages += 1
if process_page(doc_dir, page_md, pat, payload, alias_to_idx, args.force):
total_written += 1
print(f"\nDone: {total_written} matches.json (re)written across {total_pages} pages", flush=True)
if __name__ == "__main__":
main()