disclosure-bureau/scripts/maintain/58_backfill_embeddings.py
Luiz Gustavo fe19bb9c57 add page↔document navigation + DB repopulation tooling
Doc page (/d/[docId]/[page]) gains prev/next navigation bars (top + bottom):
within a doc it steps page-by-page; at the first/last page it jumps to the
previous/next document. Replaces the disabled-at-boundary links.

Indexer tooling for the VPS repopulation:
- 30-index-chunks-to-db.py: add --no-embed (fast BM25-only index; vectors
  backfilled separately) so the app is usable in minutes, not hours of CPU
  embedding.
- 57_load_relations_from_json.py: load typed relations into public.relations
  from reextract structured fields (deterministic ids, no fuzzy guessing).
- 58_backfill_embeddings.py: async pass to fill chunks.embedding (NULL rows)
  via the embed-service.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-21 14:28:14 -03:00

106 lines
3.7 KiB
Python

#!/usr/bin/env python3
"""
58_backfill_embeddings.py — Fill in chunks.embedding for rows inserted with
NULL vectors (by 30-index-chunks-to-db.py --no-embed). Runs independently of
the fast index pass so the web app is usable (BM25) while dense vectors are
computed in the background.
Processes chunks WHERE embedding IS NULL in batches, calling the embed-service,
and UPDATEs each row. Resumable: re-run to continue where it left off.
Run (inside disclosure-internal network, or with tunnels):
DATABASE_URL=postgresql://postgres:...@db:5432/postgres \
EMBED_SERVICE_URL=http://embed:8000 \
python3 scripts/maintain/58_backfill_embeddings.py [--lang pt] [--batch-size 16]
"""
from __future__ import annotations
import argparse
import os
import sys
import time
try:
import psycopg
import requests
except ImportError as e:
sys.exit(f"pip install psycopg[binary] requests # missing: {e}")
DATABASE_URL = os.getenv("DATABASE_URL") or os.getenv("SUPABASE_DB_URL")
EMBED_URL = os.getenv("EMBED_SERVICE_URL", "http://localhost:8000")
def embed_batch(texts: list[str]) -> list[list[float]]:
resp = requests.post(f"{EMBED_URL}/embed", json={"texts": texts}, timeout=120)
resp.raise_for_status()
return resp.json()["embeddings"]
def vector_literal(vec: list[float]) -> str:
return "[" + ",".join(repr(float(x)) for x in vec) + "]"
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--lang", choices=["pt", "en"], default="pt")
ap.add_argument("--batch-size", type=int, default=16)
args = ap.parse_args()
if not DATABASE_URL:
sys.exit("DATABASE_URL not set")
# health probe
r = requests.get(f"{EMBED_URL}/health", timeout=10)
r.raise_for_status()
print(f"embed service: {EMBED_URL}{r.json()}")
field = "content_pt" if args.lang == "pt" else "content_en"
other = "content_en" if args.lang == "pt" else "content_pt"
with psycopg.connect(DATABASE_URL, autocommit=False) as conn:
with conn.cursor() as cur:
cur.execute("SELECT count(*) FROM public.chunks WHERE embedding IS NULL")
total = cur.fetchone()[0]
print(f"chunks needing embedding: {total}")
if total == 0:
return 0
done = 0
t0 = time.time()
while True:
with conn.cursor() as cur:
cur.execute(
f"""SELECT chunk_pk, COALESCE(NULLIF({field}, ''), {other}, '')
FROM public.chunks WHERE embedding IS NULL
ORDER BY chunk_pk LIMIT %s""",
(args.batch_size,),
)
batch = cur.fetchall()
if not batch:
break
pks = [b[0] for b in batch]
texts = [b[1] or "" for b in batch]
try:
vecs = embed_batch(texts)
except Exception as e:
print(f" embed error at pk {pks[0]}: {e} — retrying once in 5s")
time.sleep(5)
vecs = embed_batch(texts)
with conn.cursor() as cur:
for pk, vec in zip(pks, vecs):
cur.execute(
"UPDATE public.chunks SET embedding = %s::vector WHERE chunk_pk = %s",
(vector_literal(vec), pk),
)
conn.commit()
done += len(batch)
if done % 320 == 0 or done >= total:
rate = done / max(1e-6, time.time() - t0)
eta = (total - done) / max(1e-6, rate)
print(f" {done}/{total} · {rate:.0f}/s · ETA {eta/60:.0f}min", flush=True)
print(f"✓ backfill complete: {done} embeddings in {(time.time()-t0)/60:.1f}min")
return 0
if __name__ == "__main__":
sys.exit(main())