207 lines
7 KiB
Python
Executable file
207 lines
7 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
31-populate-entity-mentions.py — Materialize public.entity_mentions table.
|
|
|
|
Reads wiki/entities/<class>/<id>.md frontmatter for canonical_name + aliases.
|
|
For each entity, scans public.chunks via ILIKE (powered by pg_trgm GIN index)
|
|
to find chunks where the entity appears literally. Upserts entity_mentions
|
|
rows linking chunk_pk ↔ entity_pk.
|
|
|
|
Pre-populates public.entities too if not already there.
|
|
|
|
Usage:
|
|
./31-populate-entity-mentions.py # all classes
|
|
./31-populate-entity-mentions.py --class people # one class
|
|
./31-populate-entity-mentions.py --limit 100 # smoke
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import os
|
|
import re
|
|
import sys
|
|
import time
|
|
from pathlib import Path
|
|
|
|
try:
|
|
import yaml
|
|
import psycopg
|
|
except ImportError as e:
|
|
sys.stderr.write(f"pip3 install pyyaml psycopg[binary] # missing: {e}\n")
|
|
sys.exit(1)
|
|
|
|
|
|
UFO_ROOT = Path(os.getenv("UFO_ROOT", "/Users/guto/ufo"))
|
|
WIKI = UFO_ROOT / "wiki"
|
|
|
|
DATABASE_URL = os.getenv("DATABASE_URL") or os.getenv("SUPABASE_DB_URL")
|
|
|
|
CLASSES = ["people", "organizations", "locations", "events", "uap-objects", "vehicles", "operations", "concepts"]
|
|
CLASS_SINGULAR = {
|
|
"people": "person",
|
|
"organizations": "organization",
|
|
"locations": "location",
|
|
"events": "event",
|
|
"uap-objects": "uap_object",
|
|
"vehicles": "vehicle",
|
|
"operations": "operation",
|
|
"concepts": "concept",
|
|
}
|
|
|
|
|
|
def read_frontmatter(path: Path) -> dict:
|
|
raw = path.read_text(encoding="utf-8")
|
|
if not raw.startswith("---"):
|
|
return {}
|
|
end = raw.find("---", 4)
|
|
return yaml.safe_load(raw[3:end].strip()) or {}
|
|
|
|
|
|
def collect_search_strings(canonical: str, aliases: list[str]) -> list[str]:
|
|
"""Aliases + canonical, normalized — used as ILIKE patterns."""
|
|
items = set()
|
|
if canonical:
|
|
items.add(canonical.strip())
|
|
for a in aliases or []:
|
|
a = str(a).strip()
|
|
if not a:
|
|
continue
|
|
items.add(a)
|
|
# Filter: very short or all-numeric strings are too noisy
|
|
out: list[str] = []
|
|
for s in items:
|
|
if len(s) < 3:
|
|
continue
|
|
if s.isdigit():
|
|
continue
|
|
# SQL ILIKE escape — % and _ are wildcards
|
|
out.append(s.replace("%", r"\%").replace("_", r"\_"))
|
|
return out
|
|
|
|
|
|
def upsert_entity(cur, cls_folder: str, entity_id: str, fm: dict) -> int | None:
|
|
canonical = (fm.get("canonical_name") or entity_id).strip()
|
|
aliases = fm.get("aliases") or []
|
|
if not isinstance(aliases, list):
|
|
aliases = [str(aliases)]
|
|
aliases = [str(a).strip() for a in aliases if str(a).strip()]
|
|
total_mentions = int(fm.get("total_mentions") or 0)
|
|
documents_count = int(fm.get("documents_count") or 0)
|
|
enrichment_status = fm.get("enrichment_status")
|
|
last_ingest = fm.get("last_ingest")
|
|
entity_class = CLASS_SINGULAR.get(cls_folder, cls_folder)
|
|
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO public.entities (
|
|
entity_class, entity_id, canonical_name, aliases,
|
|
total_mentions, documents_count, enrichment_status, last_ingest
|
|
)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
|
|
ON CONFLICT (entity_class, entity_id) DO UPDATE SET
|
|
canonical_name = EXCLUDED.canonical_name,
|
|
aliases = EXCLUDED.aliases,
|
|
total_mentions = EXCLUDED.total_mentions,
|
|
documents_count = EXCLUDED.documents_count,
|
|
enrichment_status = EXCLUDED.enrichment_status,
|
|
last_ingest = EXCLUDED.last_ingest
|
|
RETURNING entity_pk
|
|
""",
|
|
(
|
|
entity_class,
|
|
entity_id,
|
|
canonical,
|
|
aliases,
|
|
total_mentions,
|
|
documents_count,
|
|
enrichment_status,
|
|
last_ingest,
|
|
),
|
|
)
|
|
row = cur.fetchone()
|
|
return row[0] if row else None
|
|
|
|
|
|
def find_mentioning_chunks(cur, entity_pk: int, patterns: list[str]) -> int:
|
|
"""For each pattern, find chunks where it appears ILIKE; insert into entity_mentions."""
|
|
inserted = 0
|
|
for p in patterns:
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO public.entity_mentions (chunk_pk, entity_pk, surface_form)
|
|
SELECT chunk_pk, %s, %s
|
|
FROM public.chunks
|
|
WHERE content_pt ILIKE '%%' || %s || '%%'
|
|
OR content_en ILIKE '%%' || %s || '%%'
|
|
ON CONFLICT (chunk_pk, entity_pk) DO NOTHING
|
|
""",
|
|
(entity_pk, p, p, p),
|
|
)
|
|
inserted += cur.rowcount or 0
|
|
return inserted
|
|
|
|
|
|
def main():
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("--class", dest="cls", default=None, choices=CLASSES, help="Filter to one class")
|
|
ap.add_argument("--limit", type=int, default=None, help="Limit entities per class (smoke test)")
|
|
ap.add_argument("--reset", action="store_true", help="Truncate entity_mentions before run")
|
|
args = ap.parse_args()
|
|
|
|
if not DATABASE_URL:
|
|
sys.stderr.write("✗ Set DATABASE_URL\n")
|
|
sys.exit(1)
|
|
|
|
target_classes = [args.cls] if args.cls else CLASSES
|
|
|
|
t0 = time.time()
|
|
total_entities = 0
|
|
total_mentions = 0
|
|
|
|
with psycopg.connect(DATABASE_URL, autocommit=False) as conn:
|
|
if args.reset:
|
|
with conn.cursor() as cur:
|
|
cur.execute("TRUNCATE public.entity_mentions RESTART IDENTITY")
|
|
print(" ✓ TRUNCATE entity_mentions")
|
|
conn.commit()
|
|
|
|
for cls_folder in target_classes:
|
|
cls_dir = WIKI / "entities" / cls_folder
|
|
if not cls_dir.is_dir():
|
|
print(f" ⊘ {cls_folder} dir missing")
|
|
continue
|
|
files = sorted(cls_dir.glob("*.md"))
|
|
if args.limit:
|
|
files = files[: args.limit]
|
|
print(f" ▸ {cls_folder}: {len(files)} entities")
|
|
|
|
for i, fpath in enumerate(files):
|
|
eid = fpath.stem
|
|
try:
|
|
fm = read_frontmatter(fpath)
|
|
except Exception as e:
|
|
print(f" ✗ {eid}: bad frontmatter ({e})")
|
|
continue
|
|
with conn.cursor() as cur:
|
|
epk = upsert_entity(cur, cls_folder, eid, fm)
|
|
if not epk:
|
|
continue
|
|
patterns = collect_search_strings(
|
|
fm.get("canonical_name") or eid,
|
|
fm.get("aliases") or [],
|
|
)
|
|
if not patterns:
|
|
continue
|
|
m = find_mentioning_chunks(cur, epk, patterns)
|
|
total_mentions += m
|
|
conn.commit()
|
|
total_entities += 1
|
|
if (i + 1) % 500 == 0:
|
|
elapsed = round(time.time() - t0, 0)
|
|
print(f" [{i+1}/{len(files)}] {cls_folder} · {total_mentions} mentions · {int(elapsed)}s")
|
|
|
|
print(f"\nDONE — {total_entities} entities · {total_mentions} mentions · {round(time.time() - t0, 1)}s")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|