#!/usr/bin/env python3 """ 20-reanalyze-vision-gemini.py — Fallback re-vision via Gemini 3.1 Pro. When Haiku exaggerates (e.g., claims "45% redaction-heavy" on a clearly readable page), this script re-analyzes via Gemini 3.1 Pro and rewrites the page.md frontmatter (vision_description, vision_description_pt_br, redactions). Targets: --doc-id --page p173 → single page --doc-id → entire doc --flagged → all pages with flags: ["vision-redaction-mismatch"] --all → every page (slow + costly; use sparingly) Anti-hang: ThreadPoolExecutor + future.result(timeout=180s) per memory `feedback-gemini-sdk-hangs.md`. Output: overwrites page.md frontmatter fields (vision_description, vision_description_pt_br, redactions). Preserves everything else. Adds `last_reanalysis_model` and `last_reanalysis_at`. Usage: GEMINI_API_KEY=... ./20-reanalyze-vision-gemini.py --doc-id --page p173 """ from __future__ import annotations import argparse import concurrent.futures import json import os import re import sys import time from datetime import datetime, timezone from pathlib import Path try: import yaml except ImportError: sys.stderr.write("pip3 install pyyaml\n"); sys.exit(1) try: from google import genai from google.genai import types as genai_types except ImportError: sys.stderr.write("pip3 install google-genai\n"); sys.exit(1) UFO_ROOT = Path("/Users/guto/ufo") PAGES = UFO_ROOT / "wiki" / "pages" PNG_BASE = UFO_ROOT / "processing" / "png" DEFAULT_MODEL = "gemini-3.1-pro-preview" FALLBACK = ["gemini-3-pro-preview", "gemini-3.1-flash-lite"] TIMEOUT_S = 180 PROMPT = """You are re-analyzing one page of a US Department of War declassified UAP/UFO document. A previous Haiku pass produced an EXAGGERATED description (it claimed heavy redaction coverage when the actual page was largely readable). Your job: produce a PRECISE replacement. GROUND RULES: - Count redactions EXACTLY. Each redaction is a solid black bar / opaque cover blocking specific text. - Do NOT call a page "heavy redaction" unless >30% of its visible area is genuinely obscured. - For each redaction, return a tight bbox (normalized 0..1 coords) that covers ONLY the black bar, not the whole line. - If the page has NO redactions, return an empty array. If it has thin strips, give them small bboxes. Output ONE JSON object (no fence, no preamble): { "vision_description": "2-5 sentences in English. Describe what is actually visible: layout, content category, classification markings, any redaction precisely quantified. Use plain language, no hyperbole.", "vision_description_pt_br": "Mesmo conteúdo em português brasileiro (pt-br). Preserve acentos UTF-8. Mantenha citações verbatim do documento em inglês (não traduza texto que está dentro do documento).", "redactions": [ {"code": "(b)(1) 1.4(a)|(b)(3)|(b)(6)|other", "description": "what field/text was obscured", "bbox": {"x": 0.0, "y": 0.0, "w": 0.0, "h": 0.0}, "text_inferred": null} ], "content_classification": ["text-only"|"contains-photos"|"contains-sketches"|"contains-diagrams"|"contains-maps"|"contains-tables"|"contains-signatures"|"contains-stamps"|"redaction-heavy"|"mixed"|"blank"], "page_type": "cover|toc|body|signature|photo|sketch|map|stamp|blank|appendix|redaction-heavy|table-page|mixed", "reanalysis_confidence": 0.0 } """ def utc_now_iso() -> str: return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") def read_fm(path: Path) -> tuple[dict, str]: c = path.read_text(encoding="utf-8") if not c.startswith("---"): return {}, c end = c.find("---", 4) if end < 0: return {}, c try: fm = yaml.safe_load(c[3:end].strip()) or {} except yaml.YAMLError: fm = {} return fm, c[end + 3 :].lstrip("\n") def write_fm(path: Path, fm: dict, body: str) -> None: new_yaml = yaml.dump(fm, allow_unicode=True, sort_keys=False, default_flow_style=False) sep = "\n" if body.startswith("\n") else "\n\n" path.write_text(f"---\n{new_yaml}---{sep}{body}", encoding="utf-8") def call_gemini(client, png_path: Path, model: str, attempt: int = 1): """Vision call with thread-based timeout (anti-hang).""" content = [ genai_types.Part.from_bytes(data=png_path.read_bytes(), mime_type="image/png"), PROMPT, ] def _call(): return client.models.generate_content( model=model, contents=content, config=genai_types.GenerateContentConfig( response_mime_type="application/json", temperature=0.2, max_output_tokens=16384, # bumped iteratively (4096 → 8192 → 16384) for verbose pages ), ) try: with concurrent.futures.ThreadPoolExecutor(max_workers=1) as ex: future = ex.submit(_call) try: resp = future.result(timeout=TIMEOUT_S) except concurrent.futures.TimeoutError: raise RuntimeError(f"Gemini hung >{TIMEOUT_S}s") return resp.text, model except Exception as e: if attempt < len(FALLBACK) + 1: next_m = FALLBACK[attempt - 1] if attempt <= len(FALLBACK) else None if next_m: sys.stderr.write(f" ⚠ {model} failed ({type(e).__name__}); fallback → {next_m}\n") return call_gemini(client, png_path, next_m, attempt + 1) raise def parse_json_lenient(text: str) -> dict: t = text.strip() t = re.sub(r"^```(?:json)?\s*", "", t) t = re.sub(r"\s*```$", "", t) return json.loads(t) def process_page(client, page_md: Path, dry_run: bool) -> str: fm, body = read_fm(page_md) if not fm: return "no-fm" doc_id = fm.get("doc_id", "") page_num = int(fm.get("page_number", 0)) if not doc_id or not page_num: return "bad-fm" padded = f"{page_num:03d}" png = PNG_BASE / doc_id / f"p-{padded}.png" if not png.exists(): return "no-png" print(f" → {page_md.relative_to(UFO_ROOT)} (Gemini 3.1 Pro)", flush=True) t0 = time.time() try: raw, model_used = call_gemini(client, png, DEFAULT_MODEL) except Exception as e: print(f" ✗ Gemini failed: {type(e).__name__}: {e}", flush=True) return "error" dt = time.time() - t0 try: revision = parse_json_lenient(raw) except json.JSONDecodeError as e: print(f" ✗ JSON parse failed: {e}; raw[:200]={raw[:200]!r}", flush=True) return "bad-json" # Before/after summary old_n = len(fm.get("redactions") or []) new_n = len(revision.get("redactions") or []) old_desc = (fm.get("vision_description") or "")[:90] new_desc = (revision.get("vision_description") or "")[:90] print(f" redactions: {old_n} → {new_n}") print(f" OLD desc: {old_desc}…") print(f" NEW desc: {new_desc}…") if dry_run: return "dry" # Apply revision if revision.get("vision_description"): fm["vision_description"] = revision["vision_description"] if revision.get("vision_description_pt_br"): fm["vision_description_pt_br"] = revision["vision_description_pt_br"] if "redactions" in revision: fm["redactions"] = revision["redactions"] if revision.get("content_classification"): fm["content_classification"] = revision["content_classification"] if revision.get("page_type"): fm["page_type"] = revision["page_type"] fm["last_reanalysis_model"] = model_used fm["last_reanalysis_at"] = utc_now_iso() fm["last_reanalysis_confidence"] = revision.get("reanalysis_confidence") # Remove the mismatch flag now that it's been corrected flags = list(fm.get("flags") or []) if "vision-redaction-mismatch" in flags: flags.remove("vision-redaction-mismatch") fm["flags"] = flags write_fm(page_md, fm, body) print(f" ✓ wrote (took {dt:.1f}s)", flush=True) return "ok" def main(): global DEFAULT_MODEL ap = argparse.ArgumentParser() ap.add_argument("--doc-id") ap.add_argument("--page", help="specific page stem, e.g. p173 (requires --doc-id)") ap.add_argument("--flagged", action="store_true", help="all pages with vision-redaction-mismatch") ap.add_argument("--redaction-heavy", action="store_true", help="all pages currently classified redaction-heavy (re-triage)") ap.add_argument("--all", action="store_true") ap.add_argument("--pages-file", help="newline-separated list of page paths (relative to /Users/guto/ufo/ or absolute)") ap.add_argument("--model", default=DEFAULT_MODEL, help=f"override model (default {DEFAULT_MODEL})") ap.add_argument("--workers", type=int, default=1, help="parallel workers (raise for Flash Lite, keep 1 for Pro free tier)") ap.add_argument("--max", type=int, default=0, help="cap targets (0 = unlimited)") ap.add_argument("--dry-run", action="store_true") args = ap.parse_args() api_key = os.environ.get("GEMINI_API_KEY") if not api_key: sys.stderr.write("✗ GEMINI_API_KEY not set\n"); sys.exit(1) client = genai.Client(api_key=api_key) if args.doc_id and args.page: targets = [PAGES / args.doc_id / f"{args.page}.md"] elif args.doc_id: targets = sorted((PAGES / args.doc_id).glob("p*.md")) elif args.flagged: targets = [] for p in PAGES.glob("*/p*.md"): fm, _ = read_fm(p) if "vision-redaction-mismatch" in (fm.get("flags") or []): targets.append(p) elif args.redaction_heavy: targets = [] for p in PAGES.glob("*/p*.md"): fm, _ = read_fm(p) if "redaction-heavy" in (fm.get("content_classification") or []): targets.append(p) elif args.all: targets = sorted(PAGES.glob("*/p*.md")) elif args.pages_file: targets = [] for line in Path(args.pages_file).read_text().splitlines(): s = line.strip() if not s: continue p = Path(s) if s.startswith("/") else UFO_ROOT / s targets.append(p) else: ap.error("provide --doc-id (+ --page), --flagged, --redaction-heavy, --all, or --pages-file") if args.max: targets = targets[:args.max] DEFAULT_MODEL = args.model print(f"Processing {len(targets)} page(s) with {DEFAULT_MODEL} ({args.workers} worker(s))...") stats = {"ok": 0, "error": 0, "dry": 0, "no-png": 0, "no-fm": 0, "bad-fm": 0, "bad-json": 0} if args.workers <= 1: for p in targets: if not p.exists(): stats["no-fm"] += 1; continue r = process_page(client, p, args.dry_run) stats[r] = stats.get(r, 0) + 1 else: import concurrent.futures as cf with cf.ThreadPoolExecutor(max_workers=args.workers) as pool: futs = {pool.submit(process_page, client, p, args.dry_run): p for p in targets if p.exists()} for fut in cf.as_completed(futs): try: r = fut.result() stats[r] = stats.get(r, 0) + 1 except Exception as e: sys.stderr.write(f"✗ {futs[fut]}: {e}\n") stats["error"] += 1 print(f"\nDone. {stats}") if __name__ == "__main__": main()