Doc page (/d/[docId]/[page]) gains prev/next navigation bars (top + bottom): within a doc it steps page-by-page; at the first/last page it jumps to the previous/next document. Replaces the disabled-at-boundary links. Indexer tooling for the VPS repopulation: - 30-index-chunks-to-db.py: add --no-embed (fast BM25-only index; vectors backfilled separately) so the app is usable in minutes, not hours of CPU embedding. - 57_load_relations_from_json.py: load typed relations into public.relations from reextract structured fields (deterministic ids, no fuzzy guessing). - 58_backfill_embeddings.py: async pass to fill chunks.embedding (NULL rows) via the embed-service. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
393 lines
15 KiB
Python
Executable file
393 lines
15 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
30-index-chunks-to-db.py — Populate Postgres `documents` + `chunks` tables from
|
|
raw/<doc-id>--subagent/{_index.json, chunks/c*.md}. Embeds each chunk via the
|
|
embed-service (BGE-M3, 1024-dim dense) and UPSERTs into pgvector.
|
|
|
|
Idempotent: re-running re-embeds + overwrites. Per-doc transaction.
|
|
Resumable: pass --skip-existing to skip docs already indexed.
|
|
|
|
Usage:
|
|
./30-index-chunks-to-db.py # all archived docs
|
|
./30-index-chunks-to-db.py --doc-id doc-342-... # single doc
|
|
./30-index-chunks-to-db.py --lang pt # which content field to embed (default: pt)
|
|
./30-index-chunks-to-db.py --skip-existing # incremental
|
|
./30-index-chunks-to-db.py --batch-size 16 # chunks per embed call
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import sys
|
|
import time
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Iterable, Optional
|
|
|
|
try:
|
|
import yaml
|
|
import psycopg
|
|
from psycopg.types.json import Jsonb
|
|
import requests
|
|
except ImportError as e:
|
|
sys.stderr.write(f"pip3 install pyyaml psycopg[binary] requests # missing: {e}\n")
|
|
sys.exit(1)
|
|
|
|
|
|
UFO_ROOT = Path(os.getenv("UFO_ROOT", "/Users/guto/ufo"))
|
|
RAW_ROOT = UFO_ROOT / "raw"
|
|
WIKI_DOCS = UFO_ROOT / "wiki" / "documents"
|
|
|
|
DATABASE_URL = os.getenv("DATABASE_URL") or os.getenv("SUPABASE_DB_URL")
|
|
EMBED_URL = os.getenv("EMBED_SERVICE_URL", "http://localhost:8000")
|
|
|
|
|
|
def utc_iso() -> str:
|
|
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
|
|
|
|
# Known free-text string fields in chunk frontmatter that may contain literal
|
|
# colons. The Sonnet generator sometimes leaves them unquoted, which breaks
|
|
# yaml.safe_load. We quote them defensively before parsing.
|
|
import re as _re # placed here to keep import scope tight; module-level `re` not needed elsewhere
|
|
|
|
STRING_FIELDS_NEEDING_QUOTE = {
|
|
"ufo_anomaly_rationale",
|
|
"cryptid_anomaly_rationale",
|
|
"image_description_en",
|
|
"image_description_pt_br",
|
|
"extracted_text",
|
|
"redaction_inferred_content_type",
|
|
}
|
|
_STRING_FIELD_LINE_RE = _re.compile(
|
|
r"^(\s*)(" + "|".join(_re.escape(k) for k in STRING_FIELDS_NEEDING_QUOTE) + r"):\s*(.*)$"
|
|
)
|
|
|
|
|
|
def _autoquote_free_text(yaml_text: str) -> str:
|
|
"""Wrap unquoted values of known string fields in single quotes to survive
|
|
YAML colons inside the value. Also normalises Python literals (`None`,
|
|
`True`, `False`) at end-of-line into YAML literals. Idempotent."""
|
|
# First pass: normalise Python-literal Nones to YAML nulls so downstream
|
|
# float() / int() coercion works.
|
|
yaml_text = _re.sub(r":[ \t]+None([ \t]*)$", r": null\1", yaml_text, flags=_re.MULTILINE)
|
|
|
|
out = []
|
|
for line in yaml_text.split("\n"):
|
|
m = _STRING_FIELD_LINE_RE.match(line)
|
|
if not m:
|
|
out.append(line)
|
|
continue
|
|
indent, key, value = m.group(1), m.group(2), m.group(3).rstrip()
|
|
if value in ("", "null", "None", "~") or value.startswith(("'", '"', "|", ">")):
|
|
out.append(line)
|
|
continue
|
|
escaped = value.replace("'", "''")
|
|
out.append(f"{indent}{key}: '{escaped}'")
|
|
return "\n".join(out)
|
|
|
|
|
|
def read_chunk_md(path: Path) -> tuple[dict, str, str]:
|
|
"""Return (frontmatter, content_en, content_pt)."""
|
|
raw = path.read_text(encoding="utf-8")
|
|
if not raw.startswith("---"):
|
|
return {}, "", ""
|
|
end = raw.find("---", 4)
|
|
fm_yaml = _autoquote_free_text(raw[3:end].strip())
|
|
try:
|
|
fm = yaml.safe_load(fm_yaml) or {}
|
|
except yaml.YAMLError:
|
|
fm = {}
|
|
body = raw[end + 3:].lstrip("\n")
|
|
en, pt = "", ""
|
|
for line in body.split("\n"):
|
|
s = line.strip()
|
|
if s.startswith("**EN:**"):
|
|
en = s.removeprefix("**EN:**").strip()
|
|
elif s.startswith("**PT-BR:**"):
|
|
pt = s.removeprefix("**PT-BR:**").strip()
|
|
return fm, en, pt
|
|
|
|
|
|
def discover_built_docs() -> list[Path]:
|
|
return sorted(p for p in RAW_ROOT.glob("*--subagent") if (p / "_index.json").exists())
|
|
|
|
|
|
def embed_batch(texts: list[str]) -> list[list[float]]:
|
|
"""Call embed-service /embed; returns 1024-dim vectors."""
|
|
if not texts:
|
|
return []
|
|
resp = requests.post(
|
|
f"{EMBED_URL}/embed",
|
|
json={"texts": texts, "normalize": True},
|
|
timeout=120,
|
|
)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
return data["embeddings"]
|
|
|
|
|
|
def vector_literal(vec: list[float]) -> str:
|
|
"""pgvector accepts the textual form '[1.0,2.0,...]'."""
|
|
return "[" + ",".join(f"{v:.6f}" for v in vec) + "]"
|
|
|
|
|
|
def read_wiki_document_meta(doc_id: str) -> dict:
|
|
"""Pull canonical_title / collection / document_class / content_classification
|
|
from wiki/documents/<doc-id>.md frontmatter if present."""
|
|
p = WIKI_DOCS / f"{doc_id}.md"
|
|
if not p.exists():
|
|
return {}
|
|
try:
|
|
raw = p.read_text(encoding="utf-8")
|
|
if not raw.startswith("---"):
|
|
return {}
|
|
end = raw.find("---", 4)
|
|
fm = yaml.safe_load(raw[3:end].strip()) or {}
|
|
return fm
|
|
except Exception:
|
|
return {}
|
|
|
|
|
|
def upsert_document(cur, doc_id: str, idx: dict, archive_path: Path) -> None:
|
|
wiki_meta = read_wiki_document_meta(doc_id)
|
|
canonical_title = (
|
|
idx.get("canonical_title")
|
|
or wiki_meta.get("canonical_title")
|
|
or doc_id
|
|
)
|
|
content_class = wiki_meta.get("content_classification")
|
|
if content_class is not None and not isinstance(content_class, list):
|
|
content_class = [content_class]
|
|
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO public.documents (
|
|
doc_id, canonical_title, collection, document_class, page_count,
|
|
classification, content_class,
|
|
schema_version, build_approach, build_model, built_at, raw_path
|
|
)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
|
ON CONFLICT (doc_id) DO UPDATE SET
|
|
canonical_title = EXCLUDED.canonical_title,
|
|
collection = EXCLUDED.collection,
|
|
document_class = EXCLUDED.document_class,
|
|
page_count = EXCLUDED.page_count,
|
|
classification = EXCLUDED.classification,
|
|
content_class = EXCLUDED.content_class,
|
|
schema_version = EXCLUDED.schema_version,
|
|
build_approach = EXCLUDED.build_approach,
|
|
build_model = EXCLUDED.build_model,
|
|
built_at = EXCLUDED.built_at,
|
|
raw_path = EXCLUDED.raw_path,
|
|
ingested_at = NOW()
|
|
""",
|
|
(
|
|
doc_id,
|
|
canonical_title,
|
|
wiki_meta.get("collection"),
|
|
wiki_meta.get("document_class"),
|
|
idx.get("total_pages") or wiki_meta.get("page_count"),
|
|
wiki_meta.get("highest_classification") or wiki_meta.get("classification"),
|
|
content_class,
|
|
idx.get("schema_version", "0.2.0"),
|
|
idx.get("build_approach"),
|
|
idx.get("build_model"),
|
|
idx.get("build_at"),
|
|
str(archive_path.resolve()),
|
|
),
|
|
)
|
|
|
|
|
|
def index_one_doc(cur, archive: Path, lang: str, batch_size: int, no_embed: bool = False) -> tuple[int, int]:
|
|
idx_path = archive / "_index.json"
|
|
if not idx_path.exists():
|
|
return (0, 0)
|
|
idx = json.loads(idx_path.read_text(encoding="utf-8"))
|
|
doc_id = idx.get("doc_id") or archive.name.removesuffix("--subagent")
|
|
|
|
upsert_document(cur, doc_id, idx, archive)
|
|
|
|
# Wipe + re-insert chunks for this doc (idempotency)
|
|
cur.execute("DELETE FROM public.chunks WHERE doc_id = %s", (doc_id,))
|
|
|
|
chunks_dir = archive / "chunks"
|
|
entries = idx.get("chunks", [])
|
|
if not entries:
|
|
return (0, 0)
|
|
|
|
rows: list[tuple] = []
|
|
texts_for_embed: list[str] = []
|
|
|
|
def _scalar(v):
|
|
"""Defensively coerce dicts/lists to a string so psycopg can bind them
|
|
as text columns. Sonnet sometimes emits a mapping where a scalar is
|
|
expected (e.g. `redaction_inferred_content_type: {kind: x, note: y}`)."""
|
|
if v is None or isinstance(v, (str, int, float, bool)):
|
|
return v
|
|
if isinstance(v, (dict, list, tuple)):
|
|
try:
|
|
import json as _j
|
|
return _j.dumps(v, ensure_ascii=False)
|
|
except Exception:
|
|
return str(v)
|
|
return str(v)
|
|
|
|
for entry in entries:
|
|
cid = entry.get("chunk_id")
|
|
if not cid:
|
|
continue
|
|
chunk_file = chunks_dir / f"{cid}.md"
|
|
if not chunk_file.exists():
|
|
continue
|
|
fm, en, pt = read_chunk_md(chunk_file)
|
|
text_for_embed = pt if lang == "pt" else en
|
|
if not text_for_embed:
|
|
text_for_embed = en or pt or ""
|
|
texts_for_embed.append(text_for_embed)
|
|
rows.append(
|
|
(
|
|
doc_id,
|
|
cid,
|
|
int(fm.get("page") or entry.get("page") or 0),
|
|
int(fm.get("order_in_page") or entry.get("order_in_page") or 0),
|
|
int(fm.get("order_global") or entry.get("order_global") or 0),
|
|
str(fm.get("type") or entry.get("type") or "unknown"),
|
|
Jsonb(fm.get("bbox") or entry.get("bbox") or {}),
|
|
en or None,
|
|
pt or None,
|
|
(lambda v: float(v) if isinstance(v, (int, float)) else (float(v) if isinstance(v, str) and v.strip() not in ("", "None", "null") else None))(fm.get("ocr_confidence")),
|
|
_scalar(fm.get("classification")),
|
|
[str(x) if not isinstance(x, str) else x for x in (fm.get("formatting") or []) if x is not None],
|
|
_scalar(fm.get("cross_page_hint")),
|
|
_scalar(fm.get("prev_chunk")),
|
|
_scalar(fm.get("next_chunk")),
|
|
_scalar(fm.get("related_image")),
|
|
_scalar(fm.get("related_table")),
|
|
_scalar(fm.get("redaction_code")),
|
|
_scalar(fm.get("redaction_inferred_content_type")),
|
|
_scalar(fm.get("image_type")),
|
|
bool(fm.get("ufo_anomaly_detected") or False),
|
|
_scalar(fm.get("ufo_anomaly_type")),
|
|
_scalar(fm.get("ufo_anomaly_rationale")),
|
|
bool(fm.get("cryptid_anomaly_detected") or False),
|
|
_scalar(fm.get("cryptid_anomaly_type")),
|
|
_scalar(fm.get("cryptid_anomaly_rationale")),
|
|
_scalar(fm.get("image_description_en")),
|
|
_scalar(fm.get("image_description_pt_br")),
|
|
_scalar(fm.get("source_png")),
|
|
)
|
|
)
|
|
|
|
# Embed in batches (or skip → NULL embeddings, filled in later by a backfill pass)
|
|
all_embeddings: list[list[float] | None] = []
|
|
if no_embed:
|
|
all_embeddings = [None] * len(texts_for_embed)
|
|
else:
|
|
for i in range(0, len(texts_for_embed), batch_size):
|
|
batch = texts_for_embed[i : i + batch_size]
|
|
all_embeddings.extend(embed_batch(batch))
|
|
|
|
# Bulk insert with vectors (cast text → vector in SQL)
|
|
insert_sql = """
|
|
INSERT INTO public.chunks (
|
|
doc_id, chunk_id, page, order_in_page, order_global, type, bbox,
|
|
content_en, content_pt, ocr_confidence, classification, formatting,
|
|
cross_page_hint, prev_chunk, next_chunk, related_image, related_table,
|
|
redaction_code, redaction_inferred, image_type,
|
|
ufo_anomaly, ufo_anomaly_type, ufo_rationale,
|
|
cryptid_anomaly, cryptid_anomaly_type, cryptid_rationale,
|
|
image_desc_en, image_desc_pt, source_png, embedding
|
|
)
|
|
VALUES (
|
|
%s, %s, %s, %s, %s, %s, %s,
|
|
%s, %s, %s, %s, %s,
|
|
%s, %s, %s, %s, %s,
|
|
%s, %s, %s,
|
|
%s, %s, %s,
|
|
%s, %s, %s,
|
|
%s, %s, %s, %s::vector
|
|
)
|
|
"""
|
|
n_embedded = 0
|
|
for row, vec in zip(rows, all_embeddings):
|
|
cur.execute(insert_sql, row + (vector_literal(vec) if vec is not None else None,))
|
|
if vec is not None:
|
|
n_embedded += 1
|
|
|
|
return (len(rows), n_embedded)
|
|
|
|
|
|
def is_already_indexed(cur, doc_id: str) -> bool:
|
|
cur.execute(
|
|
"SELECT 1 FROM public.documents WHERE doc_id = %s",
|
|
(doc_id,),
|
|
)
|
|
return cur.fetchone() is not None
|
|
|
|
|
|
def main():
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("--doc-id", default=None, help="Index a single doc (no --subagent suffix)")
|
|
ap.add_argument("--lang", choices=["pt", "en"], default="pt", help="Language to embed (default: pt)")
|
|
ap.add_argument("--batch-size", type=int, default=16)
|
|
ap.add_argument("--skip-existing", action="store_true")
|
|
ap.add_argument("--no-embed", action="store_true",
|
|
help="Insert chunks with NULL embeddings (fast); backfill vectors later")
|
|
args = ap.parse_args()
|
|
|
|
if not DATABASE_URL:
|
|
sys.stderr.write("✗ Set DATABASE_URL (or SUPABASE_DB_URL) env var\n")
|
|
sys.exit(1)
|
|
|
|
# Probe embed service (skipped in --no-embed mode)
|
|
if not args.no_embed:
|
|
try:
|
|
r = requests.get(f"{EMBED_URL}/health", timeout=10)
|
|
r.raise_for_status()
|
|
print(f" ✓ embed service: {EMBED_URL} → {r.json()}")
|
|
except Exception as e:
|
|
sys.stderr.write(f"✗ embed service unreachable at {EMBED_URL}: {e}\n")
|
|
sys.exit(1)
|
|
else:
|
|
print(" ⚠ --no-embed: chunks indexed with NULL vectors (BM25 only until backfill)")
|
|
|
|
archives = discover_built_docs()
|
|
if args.doc_id:
|
|
archives = [a for a in archives if a.name.removesuffix("--subagent") == args.doc_id]
|
|
if not archives:
|
|
sys.stderr.write(f"✗ doc not built yet: raw/{args.doc_id}--subagent missing\n")
|
|
sys.exit(1)
|
|
|
|
print(f" found {len(archives)} built doc(s)")
|
|
t0 = time.time()
|
|
total_chunks = 0
|
|
total_docs = 0
|
|
|
|
with psycopg.connect(DATABASE_URL, autocommit=False) as conn:
|
|
for archive in archives:
|
|
doc_id = archive.name.removesuffix("--subagent")
|
|
if args.skip_existing:
|
|
with conn.cursor() as cur:
|
|
if is_already_indexed(cur, doc_id):
|
|
print(f" ⊘ skip {doc_id} (already indexed)")
|
|
continue
|
|
t_doc = time.time()
|
|
try:
|
|
with conn.cursor() as cur:
|
|
n_chunks, n_embed = index_one_doc(cur, archive, args.lang, args.batch_size, args.no_embed)
|
|
conn.commit()
|
|
wall = round(time.time() - t_doc, 1)
|
|
print(f" ✓ {doc_id} · {n_chunks} chunks · {n_embed} embedded · {wall}s")
|
|
total_chunks += n_chunks
|
|
total_docs += 1
|
|
except Exception as e:
|
|
conn.rollback()
|
|
print(f" ✗ {doc_id} FAILED: {e}")
|
|
|
|
print(f"\nDONE — {total_docs} docs · {total_chunks} chunks · {round(time.time() - t0, 1)}s total")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|