#!/usr/bin/env python3 """ 58_backfill_embeddings.py — Fill in chunks.embedding for rows inserted with NULL vectors (by 30-index-chunks-to-db.py --no-embed). Runs independently of the fast index pass so the web app is usable (BM25) while dense vectors are computed in the background. Processes chunks WHERE embedding IS NULL in batches, calling the embed-service, and UPDATEs each row. Resumable: re-run to continue where it left off. Run (inside disclosure-internal network, or with tunnels): DATABASE_URL=postgresql://postgres:...@db:5432/postgres \ EMBED_SERVICE_URL=http://embed:8000 \ python3 scripts/maintain/58_backfill_embeddings.py [--lang pt] [--batch-size 16] """ from __future__ import annotations import argparse import os import sys import time try: import psycopg import requests except ImportError as e: sys.exit(f"pip install psycopg[binary] requests # missing: {e}") DATABASE_URL = os.getenv("DATABASE_URL") or os.getenv("SUPABASE_DB_URL") EMBED_URL = os.getenv("EMBED_SERVICE_URL", "http://localhost:8000") def embed_batch(texts: list[str]) -> list[list[float]]: # Cold-start of BGE-M3 takes ~8s per text on CPU; first call can run ~minutes # for a batch. Bump timeout to 10 minutes so the first batch doesn't kill the run. resp = requests.post(f"{EMBED_URL}/embed", json={"texts": texts}, timeout=600) resp.raise_for_status() return resp.json()["embeddings"] def vector_literal(vec: list[float]) -> str: return "[" + ",".join(repr(float(x)) for x in vec) + "]" def main() -> int: ap = argparse.ArgumentParser() ap.add_argument("--lang", choices=["pt", "en"], default="pt") ap.add_argument("--batch-size", type=int, default=16) args = ap.parse_args() if not DATABASE_URL: sys.exit("DATABASE_URL not set") # health probe r = requests.get(f"{EMBED_URL}/health", timeout=10) r.raise_for_status() print(f"embed service: {EMBED_URL} → {r.json()}") field = "content_pt" if args.lang == "pt" else "content_en" other = "content_en" if args.lang == "pt" else "content_pt" with psycopg.connect(DATABASE_URL, autocommit=False) as conn: with conn.cursor() as cur: cur.execute("SELECT count(*) FROM public.chunks WHERE embedding IS NULL") total = cur.fetchone()[0] print(f"chunks needing embedding: {total}") if total == 0: return 0 done = 0 t0 = time.time() while True: with conn.cursor() as cur: cur.execute( f"""SELECT chunk_pk, COALESCE(NULLIF({field}, ''), {other}, '') FROM public.chunks WHERE embedding IS NULL ORDER BY chunk_pk LIMIT %s""", (args.batch_size,), ) batch = cur.fetchall() if not batch: break pks = [b[0] for b in batch] texts = [b[1] or "" for b in batch] try: vecs = embed_batch(texts) except Exception as e: print(f" embed error at pk {pks[0]}: {e} — retrying once in 5s") time.sleep(5) vecs = embed_batch(texts) with conn.cursor() as cur: for pk, vec in zip(pks, vecs): cur.execute( "UPDATE public.chunks SET embedding = %s::vector WHERE chunk_pk = %s", (vector_literal(vec), pk), ) conn.commit() done += len(batch) if done % 320 == 0 or done >= total: rate = done / max(1e-6, time.time() - t0) eta = (total - done) / max(1e-6, rate) print(f" {done}/{total} · {rate:.0f}/s · ETA {eta/60:.0f}min", flush=True) print(f"✓ backfill complete: {done} embeddings in {(time.time()-t0)/60:.1f}min") return 0 if __name__ == "__main__": sys.exit(main())