diff --git a/scripts/30-index-chunks-to-db.py b/scripts/30-index-chunks-to-db.py index 25aa894..80910fc 100755 --- a/scripts/30-index-chunks-to-db.py +++ b/scripts/30-index-chunks-to-db.py @@ -200,7 +200,7 @@ def upsert_document(cur, doc_id: str, idx: dict, archive_path: Path) -> None: ) -def index_one_doc(cur, archive: Path, lang: str, batch_size: int) -> tuple[int, int]: +def index_one_doc(cur, archive: Path, lang: str, batch_size: int, no_embed: bool = False) -> tuple[int, int]: idx_path = archive / "_index.json" if not idx_path.exists(): return (0, 0) @@ -280,11 +280,14 @@ def index_one_doc(cur, archive: Path, lang: str, batch_size: int) -> tuple[int, ) ) - # Embed in batches - all_embeddings: list[list[float]] = [] - for i in range(0, len(texts_for_embed), batch_size): - batch = texts_for_embed[i : i + batch_size] - all_embeddings.extend(embed_batch(batch)) + # Embed in batches (or skip → NULL embeddings, filled in later by a backfill pass) + all_embeddings: list[list[float] | None] = [] + if no_embed: + all_embeddings = [None] * len(texts_for_embed) + else: + for i in range(0, len(texts_for_embed), batch_size): + batch = texts_for_embed[i : i + batch_size] + all_embeddings.extend(embed_batch(batch)) # Bulk insert with vectors (cast text → vector in SQL) insert_sql = """ @@ -307,10 +310,13 @@ def index_one_doc(cur, archive: Path, lang: str, batch_size: int) -> tuple[int, %s, %s, %s, %s::vector ) """ + n_embedded = 0 for row, vec in zip(rows, all_embeddings): - cur.execute(insert_sql, row + (vector_literal(vec),)) + cur.execute(insert_sql, row + (vector_literal(vec) if vec is not None else None,)) + if vec is not None: + n_embedded += 1 - return (len(rows), len(all_embeddings)) + return (len(rows), n_embedded) def is_already_indexed(cur, doc_id: str) -> bool: @@ -327,20 +333,25 @@ def main(): ap.add_argument("--lang", choices=["pt", "en"], default="pt", help="Language to embed (default: pt)") ap.add_argument("--batch-size", type=int, default=16) ap.add_argument("--skip-existing", action="store_true") + ap.add_argument("--no-embed", action="store_true", + help="Insert chunks with NULL embeddings (fast); backfill vectors later") args = ap.parse_args() if not DATABASE_URL: sys.stderr.write("✗ Set DATABASE_URL (or SUPABASE_DB_URL) env var\n") sys.exit(1) - # Probe embed service - try: - r = requests.get(f"{EMBED_URL}/health", timeout=10) - r.raise_for_status() - print(f" ✓ embed service: {EMBED_URL} → {r.json()}") - except Exception as e: - sys.stderr.write(f"✗ embed service unreachable at {EMBED_URL}: {e}\n") - sys.exit(1) + # Probe embed service (skipped in --no-embed mode) + if not args.no_embed: + try: + r = requests.get(f"{EMBED_URL}/health", timeout=10) + r.raise_for_status() + print(f" ✓ embed service: {EMBED_URL} → {r.json()}") + except Exception as e: + sys.stderr.write(f"✗ embed service unreachable at {EMBED_URL}: {e}\n") + sys.exit(1) + else: + print(" ⚠ --no-embed: chunks indexed with NULL vectors (BM25 only until backfill)") archives = discover_built_docs() if args.doc_id: @@ -365,7 +376,7 @@ def main(): t_doc = time.time() try: with conn.cursor() as cur: - n_chunks, n_embed = index_one_doc(cur, archive, args.lang, args.batch_size) + n_chunks, n_embed = index_one_doc(cur, archive, args.lang, args.batch_size, args.no_embed) conn.commit() wall = round(time.time() - t_doc, 1) print(f" ✓ {doc_id} · {n_chunks} chunks · {n_embed} embedded · {wall}s") diff --git a/scripts/maintain/57_load_relations_from_json.py b/scripts/maintain/57_load_relations_from_json.py new file mode 100644 index 0000000..520e02f --- /dev/null +++ b/scripts/maintain/57_load_relations_from_json.py @@ -0,0 +1,281 @@ +#!/usr/bin/env python3 +""" +57_load_relations_from_json.py — Build typed relations for public.relations from +the reextract data, using ONLY verifiable references (Locard / absolute +provenance — no fuzzy guessing). + +Two sources, combined and deduped: + + A. STRUCTURAL relations derived from each raw/--subagent/_reextract.json + events[] (deterministic event_id, names resolved against real entities): + - event.observers[] → (person, witnessed, event) + - event (each) → (event, documented_in, document) + - event.uap_objects_observed → (event, involves_uap, uap_object) + - event.primary_location → (event, occurred_at, location) [substring + match against this doc's locations[]] + - people[] (each) → (person, mentioned_by, document) + + B. EXPLICIT relations[] from the same JSON that resolve EXACTLY (both + endpoints found in the entity name→id index): captures person↔org + (employed_by, signed, authored, commanded), etc. + +ID generation mirrors scripts/synthesize/30_rebuild_wiki_from_reextract.py so +event_id / person_id / uap_object_id match the entities table exactly. + +Run (DATABASE_URL must point at target Postgres): + DATABASE_URL=postgresql://... python3 scripts/maintain/57_load_relations_from_json.py [--truncate] +""" +from __future__ import annotations +import json +import os +import re +import sys +import unicodedata +from pathlib import Path + +import psycopg +import yaml + +UFO = Path(os.environ.get("UFO_ROOT", "/Users/guto/ufo")) +RAW = UFO / "raw" +ENT = UFO / "wiki" / "entities" + +CLASS_DIR = { + "person": "people", + "organization": "organizations", + "location": "locations", + "event": "events", + "uap_object": "uap-objects", +} + + +# ── ID generation (mirror of synthesize/30) ───────────────────────────────── +def canonicalize_name(name: str) -> str: + if not name: + return "" + nfd = unicodedata.normalize("NFD", name) + ascii_str = "".join(c for c in nfd if not unicodedata.combining(c)) + low = ascii_str.lower() + rep = re.sub(r"[^a-z0-9-]", "-", low) + col = re.sub(r"-+", "-", rep).strip("-") + if col and col[0].isdigit(): + col = "x-" + col + return col + + +def event_id_from(label: str, date_start: str | None) -> str: + slug = canonicalize_name(label or "")[:40].strip("-") or "unlabeled" + date = date_start or "" + m = re.match(r"^(\d{4})-(\d{2})-(\d{2})$", date) + if m: return f"EV-{m.group(1)}-{m.group(2)}-{m.group(3)}-{slug}" + m = re.match(r"^(\d{4})-(\d{2})$", date) + if m: return f"EV-{m.group(1)}-{m.group(2)}-XX-{slug}" + m = re.match(r"^(\d{4})$", date) + if m: return f"EV-{m.group(1)}-XX-XX-{slug}" + return f"EV-XXXX-XX-XX-{slug}" + + +def uap_object_id(event_id: str, index: int) -> str: + if event_id.startswith("EV-"): + parts = event_id[3:].split("-", 4) + if len(parts) >= 4: + year = parts[0] + slug = "-".join(parts[3:]) + compact = re.sub(r"[^A-Z0-9]", "", slug.upper())[:20] or "UNK" + return f"OBJ-EV{year}-{compact}-{index:02d}" + return f"OBJ-UNK-{index:02d}" + + +def lower(s: str) -> str: + return (s or "").strip().lower() + + +def parse_frontmatter(path: Path) -> dict | None: + try: + text = path.read_text(encoding="utf-8") + if not text.startswith("---"): + return None + return yaml.safe_load(text.split("---", 2)[1]) or {} + except Exception: + return None + + +def build_name_index() -> dict[str, dict[str, str]]: + """Per class: {name_or_alias_lower: entity_id} from real entity files.""" + index: dict[str, dict[str, str]] = {c: {} for c in CLASS_DIR} + for cls, dirname in CLASS_DIR.items(): + d = ENT / dirname + if not d.is_dir(): + continue + for f in d.glob("*.md"): + eid = f.stem + fm = parse_frontmatter(f) + if not fm: + index[cls].setdefault(eid, eid) + continue + for n in [fm.get("canonical_name")] + (fm.get("aliases") or []): + k = lower(n) + if k and k not in index[cls]: + index[cls][k] = eid + return index + + +def entity_id_sets(index) -> dict[str, set]: + return {cls: set(m.values()) for cls, m in index.items()} + + +def main() -> int: + truncate = "--truncate" in sys.argv + dburl = os.environ.get("DATABASE_URL") or os.environ.get("SUPABASE_DB_URL") + if not dburl: + sys.exit("DATABASE_URL not set") + + print("Building name→id index from wiki/entities ...") + index = build_name_index() + ids = entity_id_sets(index) + for cls in CLASS_DIR: + print(f" {cls}: {len(index[cls])} keys / {len(ids[cls])} ids") + + rows: list[tuple] = [] + + def add(sc, sid, rtype, tc, tid, doc_id, conf): + if not (sid and tid): + return + ev = f"[[{doc_id}]]" if doc_id else None + rows.append((sc, sid, rtype, tc, tid, ev, conf, "reextract")) + + def resolve(cls, name): + if cls == "document": + return (name or "").strip() or None + return index.get(cls, {}).get(lower(name)) + + n_docs = 0 + for jf in sorted(RAW.glob("*--subagent/_reextract.json")): + doc_id = jf.parent.name.removesuffix("--subagent") + try: + d = json.loads(jf.read_text(encoding="utf-8")) + except Exception: + continue + n_docs += 1 + + # locations declared in this doc (clean names) → for substring match + doc_locs = [] + for l in d.get("locations") or []: + nm = (l.get("name") or "").strip() + lid = canonicalize_name(nm) + if nm and lid in ids["location"]: + doc_locs.append((nm.lower(), lid)) + # longest names first (more specific match) + doc_locs.sort(key=lambda x: -len(x[0])) + + # A. structural from events[] + for e in d.get("events") or []: + label = (e.get("label") or "").strip() + if not label: + continue + eid = event_id_from(label, e.get("date_start")) + if eid not in ids["event"]: + continue # event entity must exist + conf = e.get("confidence") or "medium" + + # event documented_in document + add("event", eid, "documented_in", "document", doc_id, doc_id, "high") + + # observers witnessed event + for o in e.get("observers") or []: + nm = o.get("name") if isinstance(o, dict) else o + if nm and lower(nm) != "unknown": + pid = index["person"].get(lower(nm)) or ( + canonicalize_name(nm) if canonicalize_name(nm) in ids["person"] else None + ) + if pid: + add("person", pid, "witnessed", "event", eid, doc_id, conf) + + # uap_objects involves_uap + for i, u in enumerate(e.get("uap_objects_observed") or [], 1): + if not isinstance(u, dict): + continue + oid = uap_object_id(eid, i) + if oid in ids["uap_object"]: + add("event", eid, "involves_uap", "uap_object", oid, doc_id, conf) + + # event occurred_at location (substring match of doc locations) + ploc = lower(e.get("primary_location_name")) + if ploc: + for lname, lid in doc_locs: + if lname and lname in ploc: + add("event", eid, "occurred_at", "location", lid, doc_id, "medium") + break + + # people mentioned_by document + for p in d.get("people") or []: + nm = (p.get("name") or "").strip() + if nm and lower(nm) != "unknown": + pid = index["person"].get(lower(nm)) + if pid: + add("person", pid, "mentioned_by", "document", doc_id, doc_id, "medium") + + # B. explicit relations[] that resolve exactly + for r in d.get("relations") or []: + if not isinstance(r, dict): + continue + sc, tc, rtype = r.get("source_class"), r.get("target_class"), r.get("type") + if not (sc and tc and rtype): + continue + # skip the structural types already covered to avoid noise dup + sid = resolve(sc, r.get("source_name")) + tid = resolve(tc, r.get("target_name")) + if sid and tid: + add(sc, sid, rtype, tc, tid, doc_id, r.get("confidence") or "medium") + + print(f"\nProcessed {n_docs} docs; raw relation rows: {len(rows)}") + + # dedupe by (source, type, target) — keep first (evidence may vary) + seen: set[tuple] = set() + deduped: list[tuple] = [] + for row in rows: + key = (row[0], row[1], row[2], row[3], row[4]) + if key in seen: + continue + seen.add(key) + deduped.append(row) + print(f"After dedup: {len(deduped)}") + if not deduped: + return 0 + + with psycopg.connect(dburl) as conn: + with conn.cursor() as cur: + if truncate: + cur.execute("TRUNCATE public.relations") + print(" TRUNCATEd public.relations") + cur.execute("CREATE TEMP TABLE _rel (LIKE public.relations INCLUDING DEFAULTS)") + with cur.copy( + """COPY _rel (source_class, source_id, relation_type, + target_class, target_id, evidence_ref, + confidence, extracted_by) FROM STDIN""" + ) as cp: + for row in deduped: + cp.write_row(row) + cur.execute( + """INSERT INTO public.relations + (source_class, source_id, relation_type, + target_class, target_id, evidence_ref, + confidence, extracted_by) + SELECT source_class, source_id, relation_type, + target_class, target_id, evidence_ref, + confidence, extracted_by + FROM _rel ON CONFLICT DO NOTHING""" + ) + print(f"Inserted (after ON CONFLICT): {cur.rowcount}") + cur.execute( + "SELECT relation_type, COUNT(*) FROM public.relations GROUP BY relation_type ORDER BY 2 DESC" + ) + print("\n=== Relation counts in DB ===") + for t, n in cur.fetchall(): + print(f" {n:>7} {t}") + conn.commit() + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/scripts/maintain/58_backfill_embeddings.py b/scripts/maintain/58_backfill_embeddings.py new file mode 100644 index 0000000..fc64490 --- /dev/null +++ b/scripts/maintain/58_backfill_embeddings.py @@ -0,0 +1,106 @@ +#!/usr/bin/env python3 +""" +58_backfill_embeddings.py — Fill in chunks.embedding for rows inserted with +NULL vectors (by 30-index-chunks-to-db.py --no-embed). Runs independently of +the fast index pass so the web app is usable (BM25) while dense vectors are +computed in the background. + +Processes chunks WHERE embedding IS NULL in batches, calling the embed-service, +and UPDATEs each row. Resumable: re-run to continue where it left off. + +Run (inside disclosure-internal network, or with tunnels): + DATABASE_URL=postgresql://postgres:...@db:5432/postgres \ + EMBED_SERVICE_URL=http://embed:8000 \ + python3 scripts/maintain/58_backfill_embeddings.py [--lang pt] [--batch-size 16] +""" +from __future__ import annotations +import argparse +import os +import sys +import time + +try: + import psycopg + import requests +except ImportError as e: + sys.exit(f"pip install psycopg[binary] requests # missing: {e}") + +DATABASE_URL = os.getenv("DATABASE_URL") or os.getenv("SUPABASE_DB_URL") +EMBED_URL = os.getenv("EMBED_SERVICE_URL", "http://localhost:8000") + + +def embed_batch(texts: list[str]) -> list[list[float]]: + resp = requests.post(f"{EMBED_URL}/embed", json={"texts": texts}, timeout=120) + resp.raise_for_status() + return resp.json()["embeddings"] + + +def vector_literal(vec: list[float]) -> str: + return "[" + ",".join(repr(float(x)) for x in vec) + "]" + + +def main() -> int: + ap = argparse.ArgumentParser() + ap.add_argument("--lang", choices=["pt", "en"], default="pt") + ap.add_argument("--batch-size", type=int, default=16) + args = ap.parse_args() + + if not DATABASE_URL: + sys.exit("DATABASE_URL not set") + + # health probe + r = requests.get(f"{EMBED_URL}/health", timeout=10) + r.raise_for_status() + print(f"embed service: {EMBED_URL} → {r.json()}") + + field = "content_pt" if args.lang == "pt" else "content_en" + other = "content_en" if args.lang == "pt" else "content_pt" + + with psycopg.connect(DATABASE_URL, autocommit=False) as conn: + with conn.cursor() as cur: + cur.execute("SELECT count(*) FROM public.chunks WHERE embedding IS NULL") + total = cur.fetchone()[0] + print(f"chunks needing embedding: {total}") + if total == 0: + return 0 + + done = 0 + t0 = time.time() + while True: + with conn.cursor() as cur: + cur.execute( + f"""SELECT chunk_pk, COALESCE(NULLIF({field}, ''), {other}, '') + FROM public.chunks WHERE embedding IS NULL + ORDER BY chunk_pk LIMIT %s""", + (args.batch_size,), + ) + batch = cur.fetchall() + if not batch: + break + pks = [b[0] for b in batch] + texts = [b[1] or "" for b in batch] + try: + vecs = embed_batch(texts) + except Exception as e: + print(f" embed error at pk {pks[0]}: {e} — retrying once in 5s") + time.sleep(5) + vecs = embed_batch(texts) + with conn.cursor() as cur: + for pk, vec in zip(pks, vecs): + cur.execute( + "UPDATE public.chunks SET embedding = %s::vector WHERE chunk_pk = %s", + (vector_literal(vec), pk), + ) + conn.commit() + done += len(batch) + if done % 320 == 0 or done >= total: + rate = done / max(1e-6, time.time() - t0) + eta = (total - done) / max(1e-6, rate) + print(f" {done}/{total} · {rate:.0f}/s · ETA {eta/60:.0f}min", flush=True) + + print(f"✓ backfill complete: {done} embeddings in {(time.time()-t0)/60:.1f}min") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/web/app/d/[docId]/[page]/page.tsx b/web/app/d/[docId]/[page]/page.tsx index 87fef64..da37656 100644 --- a/web/app/d/[docId]/[page]/page.tsx +++ b/web/app/d/[docId]/[page]/page.tsx @@ -8,7 +8,7 @@ import Link from "next/link"; import Image from "next/image"; import { notFound } from "next/navigation"; import { readChunksByPage, readIndex, hasChunks } from "@/lib/chunks"; -import { readDocument } from "@/lib/wiki"; +import { readDocument, listDocuments } from "@/lib/wiki"; import { AuthBar } from "@/components/auth-bar"; import { ChatBubble } from "@/components/chat-bubble"; import { DocRendererV2 } from "@/components/doc-renderer-v2"; @@ -45,10 +45,11 @@ export default async function DocPageView({ ); } - const [idx, byPage, doc] = await Promise.all([ + const [idx, byPage, doc, docList] = await Promise.all([ readIndex(docId), readChunksByPage(docId), readDocument(docId), + listDocuments(), ]); if (!idx) notFound(); @@ -56,6 +57,61 @@ export default async function DocPageView({ const pngUrl = `/api/static/processing/png/${docId}/p-${m[1]}.png`; const totalPages = idx.total_pages; + // ── Navigation: prev/next page within doc; at boundaries, prev/next document ── + const pp = (n: number) => `p${String(n).padStart(3, "0")}`; + const docIdx = docList.indexOf(docId); + const prevDoc = docIdx > 0 ? docList[docIdx - 1] : null; + const nextDoc = docIdx >= 0 && docIdx < docList.length - 1 ? docList[docIdx + 1] : null; + + const prevNav = + pageNum > 1 + ? { href: `/d/${docId}/${pp(pageNum - 1)}`, label: `← página ${pageNum - 1}`, kind: "page" as const } + : prevDoc + ? { href: `/d/${prevDoc}/${pp(1)}`, label: "← documento anterior", kind: "doc" as const } + : null; + const nextNav = + pageNum < totalPages + ? { href: `/d/${docId}/${pp(pageNum + 1)}`, label: `página ${pageNum + 1} →`, kind: "page" as const } + : nextDoc + ? { href: `/d/${nextDoc}/${pp(1)}`, label: "próximo documento →", kind: "doc" as const } + : null; + + const navBar = ( + + ); + return (
@@ -77,6 +133,8 @@ export default async function DocPageView({ +
{navBar}
+
-
- 1 ? `/d/${docId}/p${String(pageNum - 1).padStart(3, "0")}` : "#"} - className={pageNum > 1 ? "hover:text-[#00ff9c]" : "opacity-30 pointer-events-none"} - > - ← p{pageNum - 1} - - - {pageNum} / {totalPages} - - - p{pageNum + 1} → - -
@@ -136,6 +171,8 @@ export default async function DocPageView({
+
{navBar}
+
);