#!/usr/bin/env python3 """ 30-index-chunks-to-db.py — Populate Postgres `documents` + `chunks` tables from raw/--subagent/{_index.json, chunks/c*.md}. Embeds each chunk via the embed-service (BGE-M3, 1024-dim dense) and UPSERTs into pgvector. Idempotent: re-running re-embeds + overwrites. Per-doc transaction. Resumable: pass --skip-existing to skip docs already indexed. Usage: ./30-index-chunks-to-db.py # all archived docs ./30-index-chunks-to-db.py --doc-id doc-342-... # single doc ./30-index-chunks-to-db.py --lang pt # which content field to embed (default: pt) ./30-index-chunks-to-db.py --skip-existing # incremental ./30-index-chunks-to-db.py --batch-size 16 # chunks per embed call """ from __future__ import annotations import argparse import json import os import sys import time from datetime import datetime, timezone from pathlib import Path from typing import Iterable, Optional try: import yaml import psycopg from psycopg.types.json import Jsonb import requests except ImportError as e: sys.stderr.write(f"pip3 install pyyaml psycopg[binary] requests # missing: {e}\n") sys.exit(1) UFO_ROOT = Path(os.getenv("UFO_ROOT", "/Users/guto/ufo")) RAW_ROOT = UFO_ROOT / "raw" WIKI_DOCS = UFO_ROOT / "wiki" / "documents" DATABASE_URL = os.getenv("DATABASE_URL") or os.getenv("SUPABASE_DB_URL") EMBED_URL = os.getenv("EMBED_SERVICE_URL", "http://localhost:8000") def utc_iso() -> str: return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") # Known free-text string fields in chunk frontmatter that may contain literal # colons. The Sonnet generator sometimes leaves them unquoted, which breaks # yaml.safe_load. We quote them defensively before parsing. import re as _re # placed here to keep import scope tight; module-level `re` not needed elsewhere STRING_FIELDS_NEEDING_QUOTE = { "ufo_anomaly_rationale", "cryptid_anomaly_rationale", "image_description_en", "image_description_pt_br", "extracted_text", "redaction_inferred_content_type", } _STRING_FIELD_LINE_RE = _re.compile( r"^(\s*)(" + "|".join(_re.escape(k) for k in STRING_FIELDS_NEEDING_QUOTE) + r"):\s*(.*)$" ) def _autoquote_free_text(yaml_text: str) -> str: """Wrap unquoted values of known string fields in single quotes to survive YAML colons inside the value. Also normalises Python literals (`None`, `True`, `False`) at end-of-line into YAML literals. Idempotent.""" # First pass: normalise Python-literal Nones to YAML nulls so downstream # float() / int() coercion works. yaml_text = _re.sub(r":[ \t]+None([ \t]*)$", r": null\1", yaml_text, flags=_re.MULTILINE) out = [] for line in yaml_text.split("\n"): m = _STRING_FIELD_LINE_RE.match(line) if not m: out.append(line) continue indent, key, value = m.group(1), m.group(2), m.group(3).rstrip() if value in ("", "null", "None", "~") or value.startswith(("'", '"', "|", ">")): out.append(line) continue escaped = value.replace("'", "''") out.append(f"{indent}{key}: '{escaped}'") return "\n".join(out) def read_chunk_md(path: Path) -> tuple[dict, str, str]: """Return (frontmatter, content_en, content_pt).""" raw = path.read_text(encoding="utf-8") if not raw.startswith("---"): return {}, "", "" end = raw.find("---", 4) fm_yaml = _autoquote_free_text(raw[3:end].strip()) try: fm = yaml.safe_load(fm_yaml) or {} except yaml.YAMLError: fm = {} body = raw[end + 3:].lstrip("\n") en, pt = "", "" for line in body.split("\n"): s = line.strip() if s.startswith("**EN:**"): en = s.removeprefix("**EN:**").strip() elif s.startswith("**PT-BR:**"): pt = s.removeprefix("**PT-BR:**").strip() return fm, en, pt def discover_built_docs() -> list[Path]: return sorted(p for p in RAW_ROOT.glob("*--subagent") if (p / "_index.json").exists()) def embed_batch(texts: list[str]) -> list[list[float]]: """Call embed-service /embed; returns 1024-dim vectors.""" if not texts: return [] resp = requests.post( f"{EMBED_URL}/embed", json={"texts": texts, "normalize": True}, timeout=120, ) resp.raise_for_status() data = resp.json() return data["embeddings"] def vector_literal(vec: list[float]) -> str: """pgvector accepts the textual form '[1.0,2.0,...]'.""" return "[" + ",".join(f"{v:.6f}" for v in vec) + "]" def read_wiki_document_meta(doc_id: str) -> dict: """Pull canonical_title / collection / document_class / content_classification from wiki/documents/.md frontmatter if present.""" p = WIKI_DOCS / f"{doc_id}.md" if not p.exists(): return {} try: raw = p.read_text(encoding="utf-8") if not raw.startswith("---"): return {} end = raw.find("---", 4) fm = yaml.safe_load(raw[3:end].strip()) or {} return fm except Exception: return {} def upsert_document(cur, doc_id: str, idx: dict, archive_path: Path) -> None: wiki_meta = read_wiki_document_meta(doc_id) canonical_title = ( idx.get("canonical_title") or wiki_meta.get("canonical_title") or doc_id ) content_class = wiki_meta.get("content_classification") if content_class is not None and not isinstance(content_class, list): content_class = [content_class] cur.execute( """ INSERT INTO public.documents ( doc_id, canonical_title, collection, document_class, page_count, classification, content_class, schema_version, build_approach, build_model, built_at, raw_path ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (doc_id) DO UPDATE SET canonical_title = EXCLUDED.canonical_title, collection = EXCLUDED.collection, document_class = EXCLUDED.document_class, page_count = EXCLUDED.page_count, classification = EXCLUDED.classification, content_class = EXCLUDED.content_class, schema_version = EXCLUDED.schema_version, build_approach = EXCLUDED.build_approach, build_model = EXCLUDED.build_model, built_at = EXCLUDED.built_at, raw_path = EXCLUDED.raw_path, ingested_at = NOW() """, ( doc_id, canonical_title, wiki_meta.get("collection"), wiki_meta.get("document_class"), idx.get("total_pages") or wiki_meta.get("page_count"), wiki_meta.get("highest_classification") or wiki_meta.get("classification"), content_class, idx.get("schema_version", "0.2.0"), idx.get("build_approach"), idx.get("build_model"), idx.get("build_at"), str(archive_path.resolve()), ), ) def index_one_doc(cur, archive: Path, lang: str, batch_size: int, no_embed: bool = False) -> tuple[int, int]: idx_path = archive / "_index.json" if not idx_path.exists(): return (0, 0) idx = json.loads(idx_path.read_text(encoding="utf-8")) doc_id = idx.get("doc_id") or archive.name.removesuffix("--subagent") upsert_document(cur, doc_id, idx, archive) # Wipe + re-insert chunks for this doc (idempotency) cur.execute("DELETE FROM public.chunks WHERE doc_id = %s", (doc_id,)) chunks_dir = archive / "chunks" entries = idx.get("chunks", []) if not entries: return (0, 0) rows: list[tuple] = [] texts_for_embed: list[str] = [] def _scalar(v): """Defensively coerce dicts/lists to a string so psycopg can bind them as text columns. Sonnet sometimes emits a mapping where a scalar is expected (e.g. `redaction_inferred_content_type: {kind: x, note: y}`).""" if v is None or isinstance(v, (str, int, float, bool)): return v if isinstance(v, (dict, list, tuple)): try: import json as _j return _j.dumps(v, ensure_ascii=False) except Exception: return str(v) return str(v) for entry in entries: cid = entry.get("chunk_id") if not cid: continue chunk_file = chunks_dir / f"{cid}.md" if not chunk_file.exists(): continue fm, en, pt = read_chunk_md(chunk_file) text_for_embed = pt if lang == "pt" else en if not text_for_embed: text_for_embed = en or pt or "" texts_for_embed.append(text_for_embed) rows.append( ( doc_id, cid, int(fm.get("page") or entry.get("page") or 0), int(fm.get("order_in_page") or entry.get("order_in_page") or 0), int(fm.get("order_global") or entry.get("order_global") or 0), str(fm.get("type") or entry.get("type") or "unknown"), Jsonb(fm.get("bbox") or entry.get("bbox") or {}), en or None, pt or None, (lambda v: float(v) if isinstance(v, (int, float)) else (float(v) if isinstance(v, str) and v.strip() not in ("", "None", "null") else None))(fm.get("ocr_confidence")), _scalar(fm.get("classification")), [str(x) if not isinstance(x, str) else x for x in (fm.get("formatting") or []) if x is not None], _scalar(fm.get("cross_page_hint")), _scalar(fm.get("prev_chunk")), _scalar(fm.get("next_chunk")), _scalar(fm.get("related_image")), _scalar(fm.get("related_table")), _scalar(fm.get("redaction_code")), _scalar(fm.get("redaction_inferred_content_type")), _scalar(fm.get("image_type")), bool(fm.get("ufo_anomaly_detected") or False), _scalar(fm.get("ufo_anomaly_type")), _scalar(fm.get("ufo_anomaly_rationale")), bool(fm.get("cryptid_anomaly_detected") or False), _scalar(fm.get("cryptid_anomaly_type")), _scalar(fm.get("cryptid_anomaly_rationale")), _scalar(fm.get("image_description_en")), _scalar(fm.get("image_description_pt_br")), _scalar(fm.get("source_png")), ) ) # Embed in batches (or skip → NULL embeddings, filled in later by a backfill pass) all_embeddings: list[list[float] | None] = [] if no_embed: all_embeddings = [None] * len(texts_for_embed) else: for i in range(0, len(texts_for_embed), batch_size): batch = texts_for_embed[i : i + batch_size] all_embeddings.extend(embed_batch(batch)) # Bulk insert with vectors (cast text → vector in SQL) insert_sql = """ INSERT INTO public.chunks ( doc_id, chunk_id, page, order_in_page, order_global, type, bbox, content_en, content_pt, ocr_confidence, classification, formatting, cross_page_hint, prev_chunk, next_chunk, related_image, related_table, redaction_code, redaction_inferred, image_type, ufo_anomaly, ufo_anomaly_type, ufo_rationale, cryptid_anomaly, cryptid_anomaly_type, cryptid_rationale, image_desc_en, image_desc_pt, source_png, embedding ) VALUES ( %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s::vector ) """ n_embedded = 0 for row, vec in zip(rows, all_embeddings): cur.execute(insert_sql, row + (vector_literal(vec) if vec is not None else None,)) if vec is not None: n_embedded += 1 return (len(rows), n_embedded) def is_already_indexed(cur, doc_id: str) -> bool: cur.execute( "SELECT 1 FROM public.documents WHERE doc_id = %s", (doc_id,), ) return cur.fetchone() is not None def main(): ap = argparse.ArgumentParser() ap.add_argument("--doc-id", default=None, help="Index a single doc (no --subagent suffix)") ap.add_argument("--lang", choices=["pt", "en"], default="pt", help="Language to embed (default: pt)") ap.add_argument("--batch-size", type=int, default=16) ap.add_argument("--skip-existing", action="store_true") ap.add_argument("--no-embed", action="store_true", help="Insert chunks with NULL embeddings (fast); backfill vectors later") args = ap.parse_args() if not DATABASE_URL: sys.stderr.write("✗ Set DATABASE_URL (or SUPABASE_DB_URL) env var\n") sys.exit(1) # Probe embed service (skipped in --no-embed mode) if not args.no_embed: try: r = requests.get(f"{EMBED_URL}/health", timeout=10) r.raise_for_status() print(f" ✓ embed service: {EMBED_URL} → {r.json()}") except Exception as e: sys.stderr.write(f"✗ embed service unreachable at {EMBED_URL}: {e}\n") sys.exit(1) else: print(" ⚠ --no-embed: chunks indexed with NULL vectors (BM25 only until backfill)") archives = discover_built_docs() if args.doc_id: archives = [a for a in archives if a.name.removesuffix("--subagent") == args.doc_id] if not archives: sys.stderr.write(f"✗ doc not built yet: raw/{args.doc_id}--subagent missing\n") sys.exit(1) print(f" found {len(archives)} built doc(s)") t0 = time.time() total_chunks = 0 total_docs = 0 with psycopg.connect(DATABASE_URL, autocommit=False) as conn: for archive in archives: doc_id = archive.name.removesuffix("--subagent") if args.skip_existing: with conn.cursor() as cur: if is_already_indexed(cur, doc_id): print(f" ⊘ skip {doc_id} (already indexed)") continue t_doc = time.time() try: with conn.cursor() as cur: n_chunks, n_embed = index_one_doc(cur, archive, args.lang, args.batch_size, args.no_embed) conn.commit() wall = round(time.time() - t_doc, 1) print(f" ✓ {doc_id} · {n_chunks} chunks · {n_embed} embedded · {wall}s") total_chunks += n_chunks total_docs += 1 except Exception as e: conn.rollback() print(f" ✗ {doc_id} FAILED: {e}") print(f"\nDONE — {total_docs} docs · {total_chunks} chunks · {round(time.time() - t0, 1)}s total") if __name__ == "__main__": main()