disclosure-bureau/scripts/19-detect-vision-mismatch.py

244 lines
9.5 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""
19-detect-vision-mismatch.py Lint pass to find Haiku exaggerations.
Detects pages whose `vision_description` claims heavy redaction/obscurity but
the actual `redactions[]` count or bbox coverage tells a milder story. Marks
flagged pages with `flags: ["vision-redaction-mismatch"]` AND optionally
re-runs vision with claude-sonnet to fix.
Heuristics (any one is enough to flag):
H1. Text contains hyperbolic redaction phrasing AND redactions[] is small.
H2. Text claims a high percentage obscured AND actual bbox area coverage is much lower.
H3. Text contradicts content_classification (e.g. says "redaction-heavy" but
content_classification doesn't include "redaction-heavy").
Usage:
./19-detect-vision-mismatch.py --doc-id <id> --page p173 [--explain]
./19-detect-vision-mismatch.py --all [--reanalyze]
./19-detect-vision-mismatch.py --all --dry-run # report only
"""
from __future__ import annotations
import argparse
import json
import re
import subprocess
import sys
from pathlib import Path
try:
import yaml
except ImportError:
sys.stderr.write("pip3 install pyyaml\n"); sys.exit(1)
UFO_ROOT = Path("/Users/guto/ufo")
PAGES = UFO_ROOT / "wiki" / "pages"
# Regexes for hyperbolic claims about redactions.
HEAVY_RE = re.compile(
r"(heavy\s+redact|substantial(ly)?\s+redact|extensiv(e|ely)\s+redact"
r"|significantly\s+redact|major\s+portion[s]?\s+(of\s+the\s+(form|page|content))?(\s+are|is)?\s+(obscured|hidden|blacked)"
r"|approximately\s+\d{2,3}%|roughly\s+\d{2,3}%|about\s+\d{2,3}%"
r"|solid\s+black\s+bars|redaction-heavy|mostly\s+redact|page\s+is\s+(largely|mostly|primarily)\s+(redacted|obscured)"
r")",
re.IGNORECASE,
)
PCT_RE = re.compile(r"(\d{2,3})\s*%", re.IGNORECASE)
def read_fm(path: Path) -> tuple[dict, str]:
c = path.read_text(encoding="utf-8")
if not c.startswith("---"):
return {}, c
end = c.find("---", 4)
if end < 0:
return {}, c
try:
fm = yaml.safe_load(c[3:end].strip()) or {}
except yaml.YAMLError:
fm = {}
return fm, c[end + 3 :].lstrip("\n")
def bbox_area_pct(redactions: list[dict]) -> float:
"""Sum of bbox areas (in % of page). Cap at 100."""
total = 0.0
for r in redactions:
b = r.get("bbox") or {}
w = float(b.get("w") or 0)
h = float(b.get("h") or 0)
total += max(0, w) * max(0, h)
return min(100.0, total * 100)
def analyse_page(fm: dict) -> tuple[bool, list[str]]:
"""Return (is_mismatch, reasons[])."""
reasons: list[str] = []
vd_en = (fm.get("vision_description") or "")
vd_pt = (fm.get("vision_description_pt_br") or "")
text = f"{vd_en}\n{vd_pt}"
redactions = fm.get("redactions") or []
n_red = len(redactions)
area = bbox_area_pct(redactions)
cc = fm.get("content_classification") or []
heavy_match = HEAVY_RE.search(text)
pct_match = PCT_RE.search(text)
claimed_pct = int(pct_match.group(1)) if pct_match else None
# H1: text claims "heavy" but redactions count is small
if heavy_match and n_red < 5:
reasons.append(f"H1: text says '{heavy_match.group(0)}' but only {n_red} redactions detected")
# H2: claimed % vs actual bbox area
if claimed_pct is not None and claimed_pct >= 25:
if area < claimed_pct * 0.4: # claim is >2.5× the actual coverage
reasons.append(f"H2: text claims ~{claimed_pct}% obscured but bbox area is {area:.1f}%")
# H3: text says redaction-heavy but content_classification disagrees
if heavy_match and "redaction-heavy" not in cc:
reasons.append(f"H3: text says heavy redaction but content_classification = {cc}")
return (len(reasons) > 0, reasons)
def run_sonnet_reanalysis(page_path: Path, fm: dict) -> dict | None:
"""Re-run vision with claude-sonnet via CLI (OAuth). Returns new fm fields or None."""
doc_id = fm.get("doc_id", "")
page_num = int(fm.get("page_number", 0))
if not doc_id or not page_num:
return None
padded = f"{page_num:03d}"
png = UFO_ROOT / "processing" / "png" / doc_id / f"p-{padded}.png"
if not png.exists():
return None
# Reuse the same prompt shape as 02-vision-page.py but ask Sonnet, and
# emphasize precise quantification of redactions.
prompt = f"""Re-analyze this US Department of War declassified UAP page with HIGH precision.
You are being run because a prior Haiku pass produced text that exaggerated the redaction coverage.
STEP 1: Use the Read tool to view this PNG: {png}
STEP 2: Output ONE JSON object (no markdown fence, no preamble) with EXACTLY these keys:
- vision_description: 2-5 sentences English. **Be precise about redaction extent**. Only say "heavy" if >30% of the page is genuinely covered by solid black bars. Count redactions accurately. Avoid hyperbole.
- vision_description_pt_br: same content in Brazilian Portuguese (preserve UTF-8 accents).
- redactions_revised: array of {{code, description, bbox: {{x,y,w,h}}}} list every actual redaction box you can see, with normalized 0..1 bbox coordinates.
- reanalysis_confidence: float 0..1.
Output ONLY the JSON. No fence."""
try:
proc = subprocess.run(
["claude", "-p", "--model", "sonnet",
"--output-format", "json",
"--max-turns", "3",
"--allowedTools", "Read",
"--add-dir", str(png.parent),
"--", prompt],
capture_output=True, text=True, timeout=180, check=False,
)
if proc.returncode != 0:
sys.stderr.write(f" Sonnet rc={proc.returncode}: {proc.stderr[-300:]}\n")
return None
cli = json.loads(proc.stdout)
if cli.get("is_error"):
return None
result_text = (cli.get("result") or "").strip()
# Strip ``` fences if any
result_text = re.sub(r"^```(?:json)?\s*", "", result_text)
result_text = re.sub(r"\s*```$", "", result_text)
return json.loads(result_text)
except Exception as e:
sys.stderr.write(f" Sonnet error: {e}\n")
return None
def process(page_path: Path, *, reanalyze: bool, dry_run: bool, explain: bool, force: bool = False) -> str:
fm, body = read_fm(page_path)
if not fm:
return "no-fm"
is_mismatch, reasons = analyse_page(fm)
if force and not is_mismatch:
is_mismatch = True
reasons.append("FORCED by user (heuristics did not auto-detect)")
if not is_mismatch:
return "ok"
if explain:
print(f"{page_path.relative_to(UFO_ROOT)}")
for r in reasons:
print(f" · {r}")
vd = (fm.get("vision_description") or "")[:200]
print(f" text excerpt: \"{vd}\"")
print(f" n_redactions: {len(fm.get('redactions') or [])}, "
f"bbox area: {bbox_area_pct(fm.get('redactions') or []):.1f}%")
flags = list(fm.get("flags") or [])
if "vision-redaction-mismatch" not in flags:
flags.append("vision-redaction-mismatch")
fm["flags"] = flags
if reanalyze and not dry_run:
print(f" → re-analyzing with Sonnet…", flush=True)
revision = run_sonnet_reanalysis(page_path, fm)
if revision:
if revision.get("vision_description"):
fm["vision_description"] = revision["vision_description"]
if revision.get("vision_description_pt_br"):
fm["vision_description_pt_br"] = revision["vision_description_pt_br"]
if revision.get("redactions_revised"):
fm["redactions"] = revision["redactions_revised"]
fm["last_reanalysis_model"] = "claude-sonnet-4-6"
if "vision-redaction-mismatch" in fm["flags"]:
fm["flags"].remove("vision-redaction-mismatch")
print(f" ✓ rewrote vision_description (now {len(fm.get('redactions') or [])} redactions)")
else:
print(f" ✗ Sonnet call failed; flag preserved")
if dry_run:
return "flag-dry"
new_yaml = yaml.dump(fm, allow_unicode=True, sort_keys=False, default_flow_style=False)
new = f"---\n{new_yaml}---\n\n{body}" if not body.startswith("\n") else f"---\n{new_yaml}---\n{body}"
page_path.write_text(new, encoding="utf-8")
return "flagged"
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--doc-id", help="single doc")
ap.add_argument("--page", help="specific page stem, e.g. p173 (requires --doc-id)")
ap.add_argument("--all", action="store_true")
ap.add_argument("--reanalyze", action="store_true", help="invoke Sonnet to fix mismatched pages")
ap.add_argument("--force", action="store_true", help="treat targeted pages as mismatch (bypass heuristics)")
ap.add_argument("--dry-run", action="store_true", help="report only, don't write")
ap.add_argument("--explain", action="store_true", help="print why each page was flagged")
args = ap.parse_args()
if args.doc_id and args.page:
targets = [PAGES / args.doc_id / f"{args.page}.md"]
elif args.doc_id:
targets = sorted((PAGES / args.doc_id).glob("p*.md"))
elif args.all:
targets = sorted(PAGES.glob("*/p*.md"))
else:
ap.error("provide --doc-id (+ --page) or --all")
stats = {"ok": 0, "flagged": 0, "flag-dry": 0, "no-fm": 0}
for p in targets:
if not p.exists():
sys.stderr.write(f"✗ missing: {p}\n"); continue
r = process(p, reanalyze=args.reanalyze, dry_run=args.dry_run, explain=args.explain, force=args.force)
stats[r] = stats.get(r, 0) + 1
print(f"\nDone. {stats}")
if __name__ == "__main__":
main()