Doc page (/d/[docId]/[page]) gains prev/next navigation bars (top + bottom): within a doc it steps page-by-page; at the first/last page it jumps to the previous/next document. Replaces the disabled-at-boundary links. Indexer tooling for the VPS repopulation: - 30-index-chunks-to-db.py: add --no-embed (fast BM25-only index; vectors backfilled separately) so the app is usable in minutes, not hours of CPU embedding. - 57_load_relations_from_json.py: load typed relations into public.relations from reextract structured fields (deterministic ids, no fuzzy guessing). - 58_backfill_embeddings.py: async pass to fill chunks.embedding (NULL rows) via the embed-service. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
106 lines
3.7 KiB
Python
106 lines
3.7 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
58_backfill_embeddings.py — Fill in chunks.embedding for rows inserted with
|
|
NULL vectors (by 30-index-chunks-to-db.py --no-embed). Runs independently of
|
|
the fast index pass so the web app is usable (BM25) while dense vectors are
|
|
computed in the background.
|
|
|
|
Processes chunks WHERE embedding IS NULL in batches, calling the embed-service,
|
|
and UPDATEs each row. Resumable: re-run to continue where it left off.
|
|
|
|
Run (inside disclosure-internal network, or with tunnels):
|
|
DATABASE_URL=postgresql://postgres:...@db:5432/postgres \
|
|
EMBED_SERVICE_URL=http://embed:8000 \
|
|
python3 scripts/maintain/58_backfill_embeddings.py [--lang pt] [--batch-size 16]
|
|
"""
|
|
from __future__ import annotations
|
|
import argparse
|
|
import os
|
|
import sys
|
|
import time
|
|
|
|
try:
|
|
import psycopg
|
|
import requests
|
|
except ImportError as e:
|
|
sys.exit(f"pip install psycopg[binary] requests # missing: {e}")
|
|
|
|
DATABASE_URL = os.getenv("DATABASE_URL") or os.getenv("SUPABASE_DB_URL")
|
|
EMBED_URL = os.getenv("EMBED_SERVICE_URL", "http://localhost:8000")
|
|
|
|
|
|
def embed_batch(texts: list[str]) -> list[list[float]]:
|
|
resp = requests.post(f"{EMBED_URL}/embed", json={"texts": texts}, timeout=120)
|
|
resp.raise_for_status()
|
|
return resp.json()["embeddings"]
|
|
|
|
|
|
def vector_literal(vec: list[float]) -> str:
|
|
return "[" + ",".join(repr(float(x)) for x in vec) + "]"
|
|
|
|
|
|
def main() -> int:
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("--lang", choices=["pt", "en"], default="pt")
|
|
ap.add_argument("--batch-size", type=int, default=16)
|
|
args = ap.parse_args()
|
|
|
|
if not DATABASE_URL:
|
|
sys.exit("DATABASE_URL not set")
|
|
|
|
# health probe
|
|
r = requests.get(f"{EMBED_URL}/health", timeout=10)
|
|
r.raise_for_status()
|
|
print(f"embed service: {EMBED_URL} → {r.json()}")
|
|
|
|
field = "content_pt" if args.lang == "pt" else "content_en"
|
|
other = "content_en" if args.lang == "pt" else "content_pt"
|
|
|
|
with psycopg.connect(DATABASE_URL, autocommit=False) as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute("SELECT count(*) FROM public.chunks WHERE embedding IS NULL")
|
|
total = cur.fetchone()[0]
|
|
print(f"chunks needing embedding: {total}")
|
|
if total == 0:
|
|
return 0
|
|
|
|
done = 0
|
|
t0 = time.time()
|
|
while True:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
f"""SELECT chunk_pk, COALESCE(NULLIF({field}, ''), {other}, '')
|
|
FROM public.chunks WHERE embedding IS NULL
|
|
ORDER BY chunk_pk LIMIT %s""",
|
|
(args.batch_size,),
|
|
)
|
|
batch = cur.fetchall()
|
|
if not batch:
|
|
break
|
|
pks = [b[0] for b in batch]
|
|
texts = [b[1] or "" for b in batch]
|
|
try:
|
|
vecs = embed_batch(texts)
|
|
except Exception as e:
|
|
print(f" embed error at pk {pks[0]}: {e} — retrying once in 5s")
|
|
time.sleep(5)
|
|
vecs = embed_batch(texts)
|
|
with conn.cursor() as cur:
|
|
for pk, vec in zip(pks, vecs):
|
|
cur.execute(
|
|
"UPDATE public.chunks SET embedding = %s::vector WHERE chunk_pk = %s",
|
|
(vector_literal(vec), pk),
|
|
)
|
|
conn.commit()
|
|
done += len(batch)
|
|
if done % 320 == 0 or done >= total:
|
|
rate = done / max(1e-6, time.time() - t0)
|
|
eta = (total - done) / max(1e-6, rate)
|
|
print(f" {done}/{total} · {rate:.0f}/s · ETA {eta/60:.0f}min", flush=True)
|
|
|
|
print(f"✓ backfill complete: {done} embeddings in {(time.time()-t0)/60:.1f}min")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|