disclosure-bureau/scripts/19-detect-vision-mismatch.py

243 lines
9.5 KiB
Python
Executable file
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
19-detect-vision-mismatch.py — Lint pass to find Haiku exaggerations.
Detects pages whose `vision_description` claims heavy redaction/obscurity but
the actual `redactions[]` count or bbox coverage tells a milder story. Marks
flagged pages with `flags: ["vision-redaction-mismatch"]` AND optionally
re-runs vision with claude-sonnet to fix.
Heuristics (any one is enough to flag):
H1. Text contains hyperbolic redaction phrasing AND redactions[] is small.
H2. Text claims a high percentage obscured AND actual bbox area coverage is much lower.
H3. Text contradicts content_classification (e.g. says "redaction-heavy" but
content_classification doesn't include "redaction-heavy").
Usage:
./19-detect-vision-mismatch.py --doc-id <id> --page p173 [--explain]
./19-detect-vision-mismatch.py --all [--reanalyze]
./19-detect-vision-mismatch.py --all --dry-run # report only
"""
from __future__ import annotations
import argparse
import json
import re
import subprocess
import sys
from pathlib import Path
try:
import yaml
except ImportError:
sys.stderr.write("pip3 install pyyaml\n"); sys.exit(1)
UFO_ROOT = Path("/Users/guto/ufo")
PAGES = UFO_ROOT / "wiki" / "pages"
# Regexes for hyperbolic claims about redactions.
HEAVY_RE = re.compile(
r"(heavy\s+redact|substantial(ly)?\s+redact|extensiv(e|ely)\s+redact"
r"|significantly\s+redact|major\s+portion[s]?\s+(of\s+the\s+(form|page|content))?(\s+are|is)?\s+(obscured|hidden|blacked)"
r"|approximately\s+\d{2,3}%|roughly\s+\d{2,3}%|about\s+\d{2,3}%"
r"|solid\s+black\s+bars|redaction-heavy|mostly\s+redact|page\s+is\s+(largely|mostly|primarily)\s+(redacted|obscured)"
r")",
re.IGNORECASE,
)
PCT_RE = re.compile(r"(\d{2,3})\s*%", re.IGNORECASE)
def read_fm(path: Path) -> tuple[dict, str]:
c = path.read_text(encoding="utf-8")
if not c.startswith("---"):
return {}, c
end = c.find("---", 4)
if end < 0:
return {}, c
try:
fm = yaml.safe_load(c[3:end].strip()) or {}
except yaml.YAMLError:
fm = {}
return fm, c[end + 3 :].lstrip("\n")
def bbox_area_pct(redactions: list[dict]) -> float:
"""Sum of bbox areas (in % of page). Cap at 100."""
total = 0.0
for r in redactions:
b = r.get("bbox") or {}
w = float(b.get("w") or 0)
h = float(b.get("h") or 0)
total += max(0, w) * max(0, h)
return min(100.0, total * 100)
def analyse_page(fm: dict) -> tuple[bool, list[str]]:
"""Return (is_mismatch, reasons[])."""
reasons: list[str] = []
vd_en = (fm.get("vision_description") or "")
vd_pt = (fm.get("vision_description_pt_br") or "")
text = f"{vd_en}\n{vd_pt}"
redactions = fm.get("redactions") or []
n_red = len(redactions)
area = bbox_area_pct(redactions)
cc = fm.get("content_classification") or []
heavy_match = HEAVY_RE.search(text)
pct_match = PCT_RE.search(text)
claimed_pct = int(pct_match.group(1)) if pct_match else None
# H1: text claims "heavy" but redactions count is small
if heavy_match and n_red < 5:
reasons.append(f"H1: text says '{heavy_match.group(0)}' but only {n_red} redactions detected")
# H2: claimed % vs actual bbox area
if claimed_pct is not None and claimed_pct >= 25:
if area < claimed_pct * 0.4: # claim is >2.5× the actual coverage
reasons.append(f"H2: text claims ~{claimed_pct}% obscured but bbox area is {area:.1f}%")
# H3: text says redaction-heavy but content_classification disagrees
if heavy_match and "redaction-heavy" not in cc:
reasons.append(f"H3: text says heavy redaction but content_classification = {cc}")
return (len(reasons) > 0, reasons)
def run_sonnet_reanalysis(page_path: Path, fm: dict) -> dict | None:
"""Re-run vision with claude-sonnet via CLI (OAuth). Returns new fm fields or None."""
doc_id = fm.get("doc_id", "")
page_num = int(fm.get("page_number", 0))
if not doc_id or not page_num:
return None
padded = f"{page_num:03d}"
png = UFO_ROOT / "processing" / "png" / doc_id / f"p-{padded}.png"
if not png.exists():
return None
# Reuse the same prompt shape as 02-vision-page.py but ask Sonnet, and
# emphasize precise quantification of redactions.
prompt = f"""Re-analyze this US Department of War declassified UAP page with HIGH precision.
You are being run because a prior Haiku pass produced text that exaggerated the redaction coverage.
STEP 1: Use the Read tool to view this PNG: {png}
STEP 2: Output ONE JSON object (no markdown fence, no preamble) with EXACTLY these keys:
- vision_description: 2-5 sentences English. **Be precise about redaction extent**. Only say "heavy" if >30% of the page is genuinely covered by solid black bars. Count redactions accurately. Avoid hyperbole.
- vision_description_pt_br: same content in Brazilian Portuguese (preserve UTF-8 accents).
- redactions_revised: array of {{code, description, bbox: {{x,y,w,h}}}} — list every actual redaction box you can see, with normalized 0..1 bbox coordinates.
- reanalysis_confidence: float 0..1.
Output ONLY the JSON. No fence."""
try:
proc = subprocess.run(
["claude", "-p", "--model", "sonnet",
"--output-format", "json",
"--max-turns", "3",
"--allowedTools", "Read",
"--add-dir", str(png.parent),
"--", prompt],
capture_output=True, text=True, timeout=180, check=False,
)
if proc.returncode != 0:
sys.stderr.write(f" Sonnet rc={proc.returncode}: {proc.stderr[-300:]}\n")
return None
cli = json.loads(proc.stdout)
if cli.get("is_error"):
return None
result_text = (cli.get("result") or "").strip()
# Strip ``` fences if any
result_text = re.sub(r"^```(?:json)?\s*", "", result_text)
result_text = re.sub(r"\s*```$", "", result_text)
return json.loads(result_text)
except Exception as e:
sys.stderr.write(f" Sonnet error: {e}\n")
return None
def process(page_path: Path, *, reanalyze: bool, dry_run: bool, explain: bool, force: bool = False) -> str:
fm, body = read_fm(page_path)
if not fm:
return "no-fm"
is_mismatch, reasons = analyse_page(fm)
if force and not is_mismatch:
is_mismatch = True
reasons.append("FORCED by user (heuristics did not auto-detect)")
if not is_mismatch:
return "ok"
if explain:
print(f"{page_path.relative_to(UFO_ROOT)}")
for r in reasons:
print(f" · {r}")
vd = (fm.get("vision_description") or "")[:200]
print(f" text excerpt: \"{vd}\"")
print(f" n_redactions: {len(fm.get('redactions') or [])}, "
f"bbox area: {bbox_area_pct(fm.get('redactions') or []):.1f}%")
flags = list(fm.get("flags") or [])
if "vision-redaction-mismatch" not in flags:
flags.append("vision-redaction-mismatch")
fm["flags"] = flags
if reanalyze and not dry_run:
print(f" → re-analyzing with Sonnet…", flush=True)
revision = run_sonnet_reanalysis(page_path, fm)
if revision:
if revision.get("vision_description"):
fm["vision_description"] = revision["vision_description"]
if revision.get("vision_description_pt_br"):
fm["vision_description_pt_br"] = revision["vision_description_pt_br"]
if revision.get("redactions_revised"):
fm["redactions"] = revision["redactions_revised"]
fm["last_reanalysis_model"] = "claude-sonnet-4-6"
if "vision-redaction-mismatch" in fm["flags"]:
fm["flags"].remove("vision-redaction-mismatch")
print(f" ✓ rewrote vision_description (now {len(fm.get('redactions') or [])} redactions)")
else:
print(f" ✗ Sonnet call failed; flag preserved")
if dry_run:
return "flag-dry"
new_yaml = yaml.dump(fm, allow_unicode=True, sort_keys=False, default_flow_style=False)
new = f"---\n{new_yaml}---\n\n{body}" if not body.startswith("\n") else f"---\n{new_yaml}---\n{body}"
page_path.write_text(new, encoding="utf-8")
return "flagged"
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--doc-id", help="single doc")
ap.add_argument("--page", help="specific page stem, e.g. p173 (requires --doc-id)")
ap.add_argument("--all", action="store_true")
ap.add_argument("--reanalyze", action="store_true", help="invoke Sonnet to fix mismatched pages")
ap.add_argument("--force", action="store_true", help="treat targeted pages as mismatch (bypass heuristics)")
ap.add_argument("--dry-run", action="store_true", help="report only, don't write")
ap.add_argument("--explain", action="store_true", help="print why each page was flagged")
args = ap.parse_args()
if args.doc_id and args.page:
targets = [PAGES / args.doc_id / f"{args.page}.md"]
elif args.doc_id:
targets = sorted((PAGES / args.doc_id).glob("p*.md"))
elif args.all:
targets = sorted(PAGES.glob("*/p*.md"))
else:
ap.error("provide --doc-id (+ --page) or --all")
stats = {"ok": 0, "flagged": 0, "flag-dry": 0, "no-fm": 0}
for p in targets:
if not p.exists():
sys.stderr.write(f"✗ missing: {p}\n"); continue
r = process(p, reanalyze=args.reanalyze, dry_run=args.dry_run, explain=args.explain, force=args.force)
stats[r] = stats.get(r, 0) + 1
print(f"\nDone. {stats}")
if __name__ == "__main__":
main()