273 lines
10 KiB
Python
Executable file
273 lines
10 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
18-build-entity-index.py — Fase 7.5 — Pre-process entity↔OCR matches
|
|
|
|
Per page, scan its OCR text against the full alias index of all 14k+
|
|
entities. Produces `wiki/pages/<doc-id>/p<NNN>.matches.json` with:
|
|
|
|
[
|
|
{"entity_id": "j-edgar-hoover", "class": "people",
|
|
"alias_matched": "Hoover", "start": 423, "end": 429}
|
|
]
|
|
|
|
The frontend uses these to highlight entity mentions inline in the OCR text
|
|
and open a modal on click (no runtime string matching).
|
|
|
|
Performance:
|
|
- Builds one big regex with alternation (longest-aliases-first) per class.
|
|
- Word boundaries enforced.
|
|
- ~10ms per page on the 14k alias index.
|
|
|
|
Idempotent. Run after `03-dedup-entities.py`. Re-run when entities change.
|
|
|
|
Usage:
|
|
./18-build-entity-index.py # all pages
|
|
./18-build-entity-index.py --doc-id <id>
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import re
|
|
import sys
|
|
import unicodedata
|
|
from pathlib import Path
|
|
|
|
try:
|
|
import yaml
|
|
except ImportError:
|
|
sys.stderr.write("pip3 install pyyaml\n")
|
|
sys.exit(1)
|
|
|
|
|
|
UFO_ROOT = Path("/Users/guto/ufo")
|
|
WIKI = UFO_ROOT / "wiki"
|
|
PAGES = WIKI / "pages"
|
|
ENTITIES = WIKI / "entities"
|
|
OCR_BASE = UFO_ROOT / "processing" / "ocr"
|
|
|
|
# Folder name → class key used by the frontend
|
|
CLASS_FOLDERS = {
|
|
"people": "people",
|
|
"organizations": "organizations",
|
|
"locations": "locations",
|
|
"events": "events",
|
|
"uap-objects": "uap-objects",
|
|
"vehicles": "vehicles",
|
|
"operations": "operations",
|
|
"concepts": "concepts",
|
|
}
|
|
|
|
# Aliases shorter than this are skipped (too many false positives on common words)
|
|
MIN_ALIAS_LEN = 3
|
|
|
|
# Stop-aliases — common nouns extracted as entities by the vision pass that
|
|
# would generate runaway matches.
|
|
STOP_ALIASES = {
|
|
"the", "and", "for", "with", "from", "this", "that", "have", "has", "had",
|
|
"they", "them", "their", "his", "her", "him", "she", "you", "your",
|
|
"page", "report", "document", "subject", "date", "time", "file", "case",
|
|
"memo", "letter", "office", "section", "general", "agent", "info",
|
|
"see", "ref", "via", "etc", "inc", "ltd", "the bureau", "the agency",
|
|
"the department", "the office", "the file", "the case", "the report",
|
|
"yes", "no", "ok", "etc.", "i.e.", "e.g.",
|
|
}
|
|
|
|
|
|
def normalize(s: str) -> str:
|
|
nfd = unicodedata.normalize("NFD", s)
|
|
return "".join(c for c in nfd if not unicodedata.combining(c)).lower()
|
|
|
|
|
|
def read_md_fm(path: Path) -> dict:
|
|
try:
|
|
c = path.read_text(encoding="utf-8")
|
|
except FileNotFoundError:
|
|
return {}
|
|
if not c.startswith("---"):
|
|
return {}
|
|
end = c.find("---", 4)
|
|
if end == -1:
|
|
return {}
|
|
try:
|
|
return yaml.safe_load(c[3:end].strip()) or {}
|
|
except yaml.YAMLError:
|
|
return {}
|
|
|
|
|
|
def collect_aliases() -> list[tuple[str, str, str, str]]:
|
|
"""Returns list of (alias_normalized_lower, alias_original, class, entity_id)."""
|
|
rows: list[tuple[str, str, str, str]] = []
|
|
for folder, cls in CLASS_FOLDERS.items():
|
|
d = ENTITIES / folder
|
|
if not d.exists():
|
|
continue
|
|
for f in d.glob("*.md"):
|
|
entity_id = f.stem
|
|
fm = read_md_fm(f)
|
|
if not fm:
|
|
continue
|
|
names = set()
|
|
cname = fm.get("canonical_name")
|
|
if isinstance(cname, str) and cname.strip():
|
|
names.add(cname.strip())
|
|
for a in (fm.get("aliases") or []):
|
|
if isinstance(a, str) and a.strip():
|
|
names.add(a.strip())
|
|
for n in names:
|
|
if len(n) < MIN_ALIAS_LEN:
|
|
continue
|
|
norm = normalize(n)
|
|
if norm in STOP_ALIASES:
|
|
continue
|
|
if not re.search(r"[a-z]", norm):
|
|
continue
|
|
rows.append((norm, n, cls, entity_id))
|
|
return rows
|
|
|
|
|
|
def build_megaregex(aliases: list[tuple[str, str, str, str]]) -> tuple[re.Pattern, list[tuple[str, str, str]]]:
|
|
"""Build one big regex with alternation, longest-first.
|
|
Returns (compiled_pattern, payload_table) where payload[i] = (alias_original, class, entity_id).
|
|
"""
|
|
# Sort by length DESC so the longest alias wins on overlap
|
|
sorted_aliases = sorted(aliases, key=lambda r: -len(r[0]))
|
|
parts: list[str] = []
|
|
payload: list[tuple[str, str, str]] = []
|
|
seen: set[str] = set()
|
|
for norm, orig, cls, eid in sorted_aliases:
|
|
if norm in seen:
|
|
continue
|
|
seen.add(norm)
|
|
# Escape regex specials in the normalized alias; word-boundary on both sides
|
|
parts.append(re.escape(norm))
|
|
payload.append((orig, cls, eid))
|
|
big = r"\b(?:" + "|".join(parts) + r")\b"
|
|
pat = re.compile(big, re.IGNORECASE)
|
|
return pat, payload
|
|
|
|
|
|
def match_page_text(ocr_text: str, pat: re.Pattern, payload: list[tuple[str, str, str]],
|
|
alias_to_idx: dict[str, int]) -> list[dict]:
|
|
"""Return list of match dicts.
|
|
|
|
OCR is matched on lower+ASCII-folded text BUT we record start/end against the ORIGINAL OCR
|
|
string so the frontend can slice the original (with accents, punctuation) correctly.
|
|
|
|
Strategy: build a char-index map normalized→original. Since NFD ASCII-fold can change length
|
|
(rare; mostly preserves), we use a simpler approach: match on a 1:1 lower-cased version of
|
|
the OCR (preserving length) and a separate normalized lowercased OCR for searching, then
|
|
map indices back. To keep it simple AND correct, just match against `ocr_text.lower()` —
|
|
accents are preserved, and `re.IGNORECASE` already handles case. The normalize() above
|
|
only matters for de-duplicating alias keys; the regex itself matches ascii→ascii via the
|
|
escape() applied to normalized strings, which is fine because most OCR text is ASCII.
|
|
"""
|
|
matches: list[dict] = []
|
|
# We compile patterns from normalized lowercased aliases. To match correctly we run the
|
|
# regex on a normalized lowercased OCR view, then map back to original indices using the
|
|
# length-preservation property of unicode lower() + NFD fold for typical Latin-1 chars.
|
|
# For simplicity: match on the ASCII-folded lowercased OCR and assume same length.
|
|
nfd = unicodedata.normalize("NFD", ocr_text)
|
|
# Length is preserved if we drop combining marks AND record orig positions per non-combining char
|
|
orig_idx: list[int] = []
|
|
folded_chars: list[str] = []
|
|
for i, c in enumerate(nfd):
|
|
if unicodedata.combining(c):
|
|
continue
|
|
folded_chars.append(c.lower())
|
|
# Map this folded char back to OCR position: walk original OCR
|
|
# ^ For accuracy, recompute via per-char NFD inverse — simpler approach below
|
|
folded = "".join(folded_chars)
|
|
|
|
# Build mapping: position in `folded` → position in ocr_text
|
|
# ocr_text → NFD → drop combining → folded. Each kept char corresponds to one source char in
|
|
# ocr_text (the base char that produced it after NFD). We walk ocr_text and count.
|
|
ocr_to_folded: list[int] = [] # ocr_to_folded[i] = folded position for ocr_text[i] (or last seen)
|
|
folded_to_ocr: list[int] = [] # folded_to_ocr[k] = ocr_text position for folded[k]
|
|
fi = 0
|
|
for i, ch in enumerate(ocr_text):
|
|
nfd_ch = unicodedata.normalize("NFD", ch)
|
|
kept = [c for c in nfd_ch if not unicodedata.combining(c)]
|
|
if kept:
|
|
folded_to_ocr.append(i)
|
|
fi += 1
|
|
ocr_to_folded.append(fi - 1)
|
|
|
|
for m in pat.finditer(folded):
|
|
start_f, end_f = m.start(), m.end()
|
|
if start_f >= len(folded_to_ocr) or end_f - 1 >= len(folded_to_ocr):
|
|
continue
|
|
start_o = folded_to_ocr[start_f]
|
|
end_o = folded_to_ocr[end_f - 1] + 1
|
|
text = m.group(0)
|
|
idx = alias_to_idx.get(text)
|
|
if idx is None:
|
|
continue
|
|
orig, cls, eid = payload[idx]
|
|
matches.append({
|
|
"entity_id": eid,
|
|
"class": cls,
|
|
"alias_matched": ocr_text[start_o:end_o],
|
|
"start": start_o,
|
|
"end": end_o,
|
|
})
|
|
return matches
|
|
|
|
|
|
def process_page(doc_dir: Path, page_md: Path, pat: re.Pattern, payload: list[tuple[str, str, str]],
|
|
alias_to_idx: dict[str, int], force: bool) -> bool:
|
|
doc_id = doc_dir.name
|
|
stem = page_md.stem # e.g., "p007"
|
|
m = re.match(r"p(\d+)", stem)
|
|
if not m:
|
|
return False
|
|
page_num = int(m.group(1))
|
|
out = doc_dir / f"{stem}.matches.json"
|
|
if out.exists() and not force:
|
|
return False
|
|
padded = f"{page_num:03d}"
|
|
ocr_path = OCR_BASE / doc_id / f"p-{padded}.txt"
|
|
try:
|
|
ocr_text = ocr_path.read_text(encoding="utf-8")
|
|
except FileNotFoundError:
|
|
return False
|
|
matches = match_page_text(ocr_text, pat, payload, alias_to_idx)
|
|
out.write_text(json.dumps(matches, ensure_ascii=False), encoding="utf-8")
|
|
return True
|
|
|
|
|
|
def main():
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("--doc-id", help="single doc")
|
|
ap.add_argument("--force", action="store_true")
|
|
args = ap.parse_args()
|
|
|
|
print("Collecting aliases from all entities...", flush=True)
|
|
aliases = collect_aliases()
|
|
print(f" {len(aliases)} (alias, entity) pairs", flush=True)
|
|
|
|
print("Building mega-regex...", flush=True)
|
|
pat, payload = build_megaregex(aliases)
|
|
alias_to_idx = {norm: i for i, (orig, cls, eid) in enumerate(payload)
|
|
for norm in [orig.lower()]}
|
|
# ^ but pat matches with IGNORECASE on the folded text, so we need normalized→idx
|
|
# Rebuild correctly: walk payload, derive normalized form
|
|
alias_to_idx = {}
|
|
for i, (orig, cls, eid) in enumerate(payload):
|
|
norm = normalize(orig)
|
|
alias_to_idx[norm] = i
|
|
print(f" pattern has {len(payload)} unique aliases", flush=True)
|
|
|
|
docs = [PAGES / args.doc_id] if args.doc_id else sorted(d for d in PAGES.iterdir() if d.is_dir())
|
|
total_written = 0
|
|
total_pages = 0
|
|
for doc_dir in docs:
|
|
for page_md in sorted(doc_dir.glob("p*.md")):
|
|
total_pages += 1
|
|
if process_page(doc_dir, page_md, pat, payload, alias_to_idx, args.force):
|
|
total_written += 1
|
|
print(f"\nDone: {total_written} matches.json (re)written across {total_pages} pages", flush=True)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|