disclosure-bureau/scripts/30-index-chunks-to-db.py
Luiz Gustavo fe19bb9c57 add page↔document navigation + DB repopulation tooling
Doc page (/d/[docId]/[page]) gains prev/next navigation bars (top + bottom):
within a doc it steps page-by-page; at the first/last page it jumps to the
previous/next document. Replaces the disabled-at-boundary links.

Indexer tooling for the VPS repopulation:
- 30-index-chunks-to-db.py: add --no-embed (fast BM25-only index; vectors
  backfilled separately) so the app is usable in minutes, not hours of CPU
  embedding.
- 57_load_relations_from_json.py: load typed relations into public.relations
  from reextract structured fields (deterministic ids, no fuzzy guessing).
- 58_backfill_embeddings.py: async pass to fill chunks.embedding (NULL rows)
  via the embed-service.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-21 14:28:14 -03:00

393 lines
15 KiB
Python
Executable file

#!/usr/bin/env python3
"""
30-index-chunks-to-db.py — Populate Postgres `documents` + `chunks` tables from
raw/<doc-id>--subagent/{_index.json, chunks/c*.md}. Embeds each chunk via the
embed-service (BGE-M3, 1024-dim dense) and UPSERTs into pgvector.
Idempotent: re-running re-embeds + overwrites. Per-doc transaction.
Resumable: pass --skip-existing to skip docs already indexed.
Usage:
./30-index-chunks-to-db.py # all archived docs
./30-index-chunks-to-db.py --doc-id doc-342-... # single doc
./30-index-chunks-to-db.py --lang pt # which content field to embed (default: pt)
./30-index-chunks-to-db.py --skip-existing # incremental
./30-index-chunks-to-db.py --batch-size 16 # chunks per embed call
"""
from __future__ import annotations
import argparse
import json
import os
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Iterable, Optional
try:
import yaml
import psycopg
from psycopg.types.json import Jsonb
import requests
except ImportError as e:
sys.stderr.write(f"pip3 install pyyaml psycopg[binary] requests # missing: {e}\n")
sys.exit(1)
UFO_ROOT = Path(os.getenv("UFO_ROOT", "/Users/guto/ufo"))
RAW_ROOT = UFO_ROOT / "raw"
WIKI_DOCS = UFO_ROOT / "wiki" / "documents"
DATABASE_URL = os.getenv("DATABASE_URL") or os.getenv("SUPABASE_DB_URL")
EMBED_URL = os.getenv("EMBED_SERVICE_URL", "http://localhost:8000")
def utc_iso() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
# Known free-text string fields in chunk frontmatter that may contain literal
# colons. The Sonnet generator sometimes leaves them unquoted, which breaks
# yaml.safe_load. We quote them defensively before parsing.
import re as _re # placed here to keep import scope tight; module-level `re` not needed elsewhere
STRING_FIELDS_NEEDING_QUOTE = {
"ufo_anomaly_rationale",
"cryptid_anomaly_rationale",
"image_description_en",
"image_description_pt_br",
"extracted_text",
"redaction_inferred_content_type",
}
_STRING_FIELD_LINE_RE = _re.compile(
r"^(\s*)(" + "|".join(_re.escape(k) for k in STRING_FIELDS_NEEDING_QUOTE) + r"):\s*(.*)$"
)
def _autoquote_free_text(yaml_text: str) -> str:
"""Wrap unquoted values of known string fields in single quotes to survive
YAML colons inside the value. Also normalises Python literals (`None`,
`True`, `False`) at end-of-line into YAML literals. Idempotent."""
# First pass: normalise Python-literal Nones to YAML nulls so downstream
# float() / int() coercion works.
yaml_text = _re.sub(r":[ \t]+None([ \t]*)$", r": null\1", yaml_text, flags=_re.MULTILINE)
out = []
for line in yaml_text.split("\n"):
m = _STRING_FIELD_LINE_RE.match(line)
if not m:
out.append(line)
continue
indent, key, value = m.group(1), m.group(2), m.group(3).rstrip()
if value in ("", "null", "None", "~") or value.startswith(("'", '"', "|", ">")):
out.append(line)
continue
escaped = value.replace("'", "''")
out.append(f"{indent}{key}: '{escaped}'")
return "\n".join(out)
def read_chunk_md(path: Path) -> tuple[dict, str, str]:
"""Return (frontmatter, content_en, content_pt)."""
raw = path.read_text(encoding="utf-8")
if not raw.startswith("---"):
return {}, "", ""
end = raw.find("---", 4)
fm_yaml = _autoquote_free_text(raw[3:end].strip())
try:
fm = yaml.safe_load(fm_yaml) or {}
except yaml.YAMLError:
fm = {}
body = raw[end + 3:].lstrip("\n")
en, pt = "", ""
for line in body.split("\n"):
s = line.strip()
if s.startswith("**EN:**"):
en = s.removeprefix("**EN:**").strip()
elif s.startswith("**PT-BR:**"):
pt = s.removeprefix("**PT-BR:**").strip()
return fm, en, pt
def discover_built_docs() -> list[Path]:
return sorted(p for p in RAW_ROOT.glob("*--subagent") if (p / "_index.json").exists())
def embed_batch(texts: list[str]) -> list[list[float]]:
"""Call embed-service /embed; returns 1024-dim vectors."""
if not texts:
return []
resp = requests.post(
f"{EMBED_URL}/embed",
json={"texts": texts, "normalize": True},
timeout=120,
)
resp.raise_for_status()
data = resp.json()
return data["embeddings"]
def vector_literal(vec: list[float]) -> str:
"""pgvector accepts the textual form '[1.0,2.0,...]'."""
return "[" + ",".join(f"{v:.6f}" for v in vec) + "]"
def read_wiki_document_meta(doc_id: str) -> dict:
"""Pull canonical_title / collection / document_class / content_classification
from wiki/documents/<doc-id>.md frontmatter if present."""
p = WIKI_DOCS / f"{doc_id}.md"
if not p.exists():
return {}
try:
raw = p.read_text(encoding="utf-8")
if not raw.startswith("---"):
return {}
end = raw.find("---", 4)
fm = yaml.safe_load(raw[3:end].strip()) or {}
return fm
except Exception:
return {}
def upsert_document(cur, doc_id: str, idx: dict, archive_path: Path) -> None:
wiki_meta = read_wiki_document_meta(doc_id)
canonical_title = (
idx.get("canonical_title")
or wiki_meta.get("canonical_title")
or doc_id
)
content_class = wiki_meta.get("content_classification")
if content_class is not None and not isinstance(content_class, list):
content_class = [content_class]
cur.execute(
"""
INSERT INTO public.documents (
doc_id, canonical_title, collection, document_class, page_count,
classification, content_class,
schema_version, build_approach, build_model, built_at, raw_path
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (doc_id) DO UPDATE SET
canonical_title = EXCLUDED.canonical_title,
collection = EXCLUDED.collection,
document_class = EXCLUDED.document_class,
page_count = EXCLUDED.page_count,
classification = EXCLUDED.classification,
content_class = EXCLUDED.content_class,
schema_version = EXCLUDED.schema_version,
build_approach = EXCLUDED.build_approach,
build_model = EXCLUDED.build_model,
built_at = EXCLUDED.built_at,
raw_path = EXCLUDED.raw_path,
ingested_at = NOW()
""",
(
doc_id,
canonical_title,
wiki_meta.get("collection"),
wiki_meta.get("document_class"),
idx.get("total_pages") or wiki_meta.get("page_count"),
wiki_meta.get("highest_classification") or wiki_meta.get("classification"),
content_class,
idx.get("schema_version", "0.2.0"),
idx.get("build_approach"),
idx.get("build_model"),
idx.get("build_at"),
str(archive_path.resolve()),
),
)
def index_one_doc(cur, archive: Path, lang: str, batch_size: int, no_embed: bool = False) -> tuple[int, int]:
idx_path = archive / "_index.json"
if not idx_path.exists():
return (0, 0)
idx = json.loads(idx_path.read_text(encoding="utf-8"))
doc_id = idx.get("doc_id") or archive.name.removesuffix("--subagent")
upsert_document(cur, doc_id, idx, archive)
# Wipe + re-insert chunks for this doc (idempotency)
cur.execute("DELETE FROM public.chunks WHERE doc_id = %s", (doc_id,))
chunks_dir = archive / "chunks"
entries = idx.get("chunks", [])
if not entries:
return (0, 0)
rows: list[tuple] = []
texts_for_embed: list[str] = []
def _scalar(v):
"""Defensively coerce dicts/lists to a string so psycopg can bind them
as text columns. Sonnet sometimes emits a mapping where a scalar is
expected (e.g. `redaction_inferred_content_type: {kind: x, note: y}`)."""
if v is None or isinstance(v, (str, int, float, bool)):
return v
if isinstance(v, (dict, list, tuple)):
try:
import json as _j
return _j.dumps(v, ensure_ascii=False)
except Exception:
return str(v)
return str(v)
for entry in entries:
cid = entry.get("chunk_id")
if not cid:
continue
chunk_file = chunks_dir / f"{cid}.md"
if not chunk_file.exists():
continue
fm, en, pt = read_chunk_md(chunk_file)
text_for_embed = pt if lang == "pt" else en
if not text_for_embed:
text_for_embed = en or pt or ""
texts_for_embed.append(text_for_embed)
rows.append(
(
doc_id,
cid,
int(fm.get("page") or entry.get("page") or 0),
int(fm.get("order_in_page") or entry.get("order_in_page") or 0),
int(fm.get("order_global") or entry.get("order_global") or 0),
str(fm.get("type") or entry.get("type") or "unknown"),
Jsonb(fm.get("bbox") or entry.get("bbox") or {}),
en or None,
pt or None,
(lambda v: float(v) if isinstance(v, (int, float)) else (float(v) if isinstance(v, str) and v.strip() not in ("", "None", "null") else None))(fm.get("ocr_confidence")),
_scalar(fm.get("classification")),
[str(x) if not isinstance(x, str) else x for x in (fm.get("formatting") or []) if x is not None],
_scalar(fm.get("cross_page_hint")),
_scalar(fm.get("prev_chunk")),
_scalar(fm.get("next_chunk")),
_scalar(fm.get("related_image")),
_scalar(fm.get("related_table")),
_scalar(fm.get("redaction_code")),
_scalar(fm.get("redaction_inferred_content_type")),
_scalar(fm.get("image_type")),
bool(fm.get("ufo_anomaly_detected") or False),
_scalar(fm.get("ufo_anomaly_type")),
_scalar(fm.get("ufo_anomaly_rationale")),
bool(fm.get("cryptid_anomaly_detected") or False),
_scalar(fm.get("cryptid_anomaly_type")),
_scalar(fm.get("cryptid_anomaly_rationale")),
_scalar(fm.get("image_description_en")),
_scalar(fm.get("image_description_pt_br")),
_scalar(fm.get("source_png")),
)
)
# Embed in batches (or skip → NULL embeddings, filled in later by a backfill pass)
all_embeddings: list[list[float] | None] = []
if no_embed:
all_embeddings = [None] * len(texts_for_embed)
else:
for i in range(0, len(texts_for_embed), batch_size):
batch = texts_for_embed[i : i + batch_size]
all_embeddings.extend(embed_batch(batch))
# Bulk insert with vectors (cast text → vector in SQL)
insert_sql = """
INSERT INTO public.chunks (
doc_id, chunk_id, page, order_in_page, order_global, type, bbox,
content_en, content_pt, ocr_confidence, classification, formatting,
cross_page_hint, prev_chunk, next_chunk, related_image, related_table,
redaction_code, redaction_inferred, image_type,
ufo_anomaly, ufo_anomaly_type, ufo_rationale,
cryptid_anomaly, cryptid_anomaly_type, cryptid_rationale,
image_desc_en, image_desc_pt, source_png, embedding
)
VALUES (
%s, %s, %s, %s, %s, %s, %s,
%s, %s, %s, %s, %s,
%s, %s, %s, %s, %s,
%s, %s, %s,
%s, %s, %s,
%s, %s, %s,
%s, %s, %s, %s::vector
)
"""
n_embedded = 0
for row, vec in zip(rows, all_embeddings):
cur.execute(insert_sql, row + (vector_literal(vec) if vec is not None else None,))
if vec is not None:
n_embedded += 1
return (len(rows), n_embedded)
def is_already_indexed(cur, doc_id: str) -> bool:
cur.execute(
"SELECT 1 FROM public.documents WHERE doc_id = %s",
(doc_id,),
)
return cur.fetchone() is not None
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--doc-id", default=None, help="Index a single doc (no --subagent suffix)")
ap.add_argument("--lang", choices=["pt", "en"], default="pt", help="Language to embed (default: pt)")
ap.add_argument("--batch-size", type=int, default=16)
ap.add_argument("--skip-existing", action="store_true")
ap.add_argument("--no-embed", action="store_true",
help="Insert chunks with NULL embeddings (fast); backfill vectors later")
args = ap.parse_args()
if not DATABASE_URL:
sys.stderr.write("✗ Set DATABASE_URL (or SUPABASE_DB_URL) env var\n")
sys.exit(1)
# Probe embed service (skipped in --no-embed mode)
if not args.no_embed:
try:
r = requests.get(f"{EMBED_URL}/health", timeout=10)
r.raise_for_status()
print(f" ✓ embed service: {EMBED_URL}{r.json()}")
except Exception as e:
sys.stderr.write(f"✗ embed service unreachable at {EMBED_URL}: {e}\n")
sys.exit(1)
else:
print(" ⚠ --no-embed: chunks indexed with NULL vectors (BM25 only until backfill)")
archives = discover_built_docs()
if args.doc_id:
archives = [a for a in archives if a.name.removesuffix("--subagent") == args.doc_id]
if not archives:
sys.stderr.write(f"✗ doc not built yet: raw/{args.doc_id}--subagent missing\n")
sys.exit(1)
print(f" found {len(archives)} built doc(s)")
t0 = time.time()
total_chunks = 0
total_docs = 0
with psycopg.connect(DATABASE_URL, autocommit=False) as conn:
for archive in archives:
doc_id = archive.name.removesuffix("--subagent")
if args.skip_existing:
with conn.cursor() as cur:
if is_already_indexed(cur, doc_id):
print(f" ⊘ skip {doc_id} (already indexed)")
continue
t_doc = time.time()
try:
with conn.cursor() as cur:
n_chunks, n_embed = index_one_doc(cur, archive, args.lang, args.batch_size, args.no_embed)
conn.commit()
wall = round(time.time() - t_doc, 1)
print(f"{doc_id} · {n_chunks} chunks · {n_embed} embedded · {wall}s")
total_chunks += n_chunks
total_docs += 1
except Exception as e:
conn.rollback()
print(f"{doc_id} FAILED: {e}")
print(f"\nDONE — {total_docs} docs · {total_chunks} chunks · {round(time.time() - t0, 1)}s total")
if __name__ == "__main__":
main()