disclosure-bureau/scripts/maintain/58_backfill_embeddings.py

109 lines
3.9 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""
58_backfill_embeddings.py Fill in chunks.embedding for rows inserted with
NULL vectors (by 30-index-chunks-to-db.py --no-embed). Runs independently of
the fast index pass so the web app is usable (BM25) while dense vectors are
computed in the background.
Processes chunks WHERE embedding IS NULL in batches, calling the embed-service,
and UPDATEs each row. Resumable: re-run to continue where it left off.
Run (inside disclosure-internal network, or with tunnels):
DATABASE_URL=postgresql://postgres:...@db:5432/postgres \
EMBED_SERVICE_URL=http://embed:8000 \
python3 scripts/maintain/58_backfill_embeddings.py [--lang pt] [--batch-size 16]
"""
from __future__ import annotations
import argparse
import os
import sys
import time
try:
import psycopg
import requests
except ImportError as e:
sys.exit(f"pip install psycopg[binary] requests # missing: {e}")
DATABASE_URL = os.getenv("DATABASE_URL") or os.getenv("SUPABASE_DB_URL")
EMBED_URL = os.getenv("EMBED_SERVICE_URL", "http://localhost:8000")
def embed_batch(texts: list[str]) -> list[list[float]]:
W0+W1+W1.2: security hardening, observability, autocomplete, glitchtip, forgejo CI W0 — security hardening (5 fixes verified live on disclosure.top) - middleware: gate /api/admin/* same as /admin/* (F1) - imgproxy: tighten LOCAL_FILESYSTEM_ROOT from / to /var/lib/storage (F2) - studio: real basic-auth label (bcrypt hash, middleware reference) (F3) - relations: ENABLE ROW LEVEL SECURITY + public SELECT policy (F4) - migration 0003: fold is_searchable + hybrid_search update into canonical (TD#2) W1 — observability + resilience + autocomplete - studio: HOSTNAME=0.0.0.0 so Next.js binds on loopback for healthcheck - compose: PG_POOL_MAX=20, CLAUDE_CODE_OAUTH_TOKEN gated by separate env - claude-code.ts: subprocess timeout configurable (CLAUDE_CODE_TIMEOUT_MS) - openrouter.ts: retry with exponential backoff + Retry-After + in-memory circuit breaker (promotes FALLBACK after CB_THRESHOLD failures) - lib/logger.ts: pino logger (NDJSON prod / pretty dev) + withRequest helper - middleware: mints correlation_id, stamps x-correlation-id response header, emits structured http_request log per /api/* call - messages/route.ts: switch to structured logger - 60_meili_index.py: push documents + chunks into Meilisearch - /api/search/autocomplete: parallel meili search (docs + chunks), 5-8ms p50 - search-autocomplete.tsx: debounced dropdown wired into search-panel W1.2 — Glitchtip + Forgejo self-hosted - compose: glitchtip-redis + glitchtip-web + glitchtip-worker (v4.2) - compose: forgejo + forgejo-runner (server v9, runner v6) with group_add=988 - @sentry/nextjs SDK wired (instrumentation.ts + sentry.{client,server}.config.ts) - /api/admin/throw smoke endpoint (gated by W0-F1 middleware) - Synthetic event ingestion verified at glitchtip.disclosure.top - forgejo.disclosure.top up, repo discadmin/disclosure-bureau created, runner registered (labels: ubuntu-latest, docker) - .forgejo/workflows/ci.yml: typecheck + lint + build + npm audit + python syntax + compose validation Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-23 21:18:42 +00:00
# Cold-start of BGE-M3 takes ~8s per text on CPU; first call can run ~minutes
# for a batch. Bump timeout to 10 minutes so the first batch doesn't kill the run.
resp = requests.post(f"{EMBED_URL}/embed", json={"texts": texts}, timeout=600)
resp.raise_for_status()
return resp.json()["embeddings"]
def vector_literal(vec: list[float]) -> str:
return "[" + ",".join(repr(float(x)) for x in vec) + "]"
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--lang", choices=["pt", "en"], default="pt")
ap.add_argument("--batch-size", type=int, default=16)
args = ap.parse_args()
if not DATABASE_URL:
sys.exit("DATABASE_URL not set")
# health probe
r = requests.get(f"{EMBED_URL}/health", timeout=10)
r.raise_for_status()
print(f"embed service: {EMBED_URL}{r.json()}")
field = "content_pt" if args.lang == "pt" else "content_en"
other = "content_en" if args.lang == "pt" else "content_pt"
with psycopg.connect(DATABASE_URL, autocommit=False) as conn:
with conn.cursor() as cur:
cur.execute("SELECT count(*) FROM public.chunks WHERE embedding IS NULL")
total = cur.fetchone()[0]
print(f"chunks needing embedding: {total}")
if total == 0:
return 0
done = 0
t0 = time.time()
while True:
with conn.cursor() as cur:
cur.execute(
f"""SELECT chunk_pk, COALESCE(NULLIF({field}, ''), {other}, '')
FROM public.chunks WHERE embedding IS NULL
ORDER BY chunk_pk LIMIT %s""",
(args.batch_size,),
)
batch = cur.fetchall()
if not batch:
break
pks = [b[0] for b in batch]
texts = [b[1] or "" for b in batch]
try:
vecs = embed_batch(texts)
except Exception as e:
print(f" embed error at pk {pks[0]}: {e} — retrying once in 5s")
time.sleep(5)
vecs = embed_batch(texts)
with conn.cursor() as cur:
for pk, vec in zip(pks, vecs):
cur.execute(
"UPDATE public.chunks SET embedding = %s::vector WHERE chunk_pk = %s",
(vector_literal(vec), pk),
)
conn.commit()
done += len(batch)
if done % 320 == 0 or done >= total:
rate = done / max(1e-6, time.time() - t0)
eta = (total - done) / max(1e-6, rate)
print(f" {done}/{total} · {rate:.0f}/s · ETA {eta/60:.0f}min", flush=True)
print(f"✓ backfill complete: {done} embeddings in {(time.time()-t0)/60:.1f}min")
return 0
if __name__ == "__main__":
sys.exit(main())