disclosure-bureau/scripts/24-document-synthesis.py

393 lines
15 KiB
Python
Executable file

#!/usr/bin/env python3
"""
24-document-synthesis.py — Cross-page document synthesis via Claude Sonnet 4.6.
Until now `wiki/documents/<doc-id>.md` was a DUMB union of per-page frontmatter
(page count, classification stats, entity union). Useless as a narrative.
This script READS THE WHOLE DOCUMENT (all OCR pages + vision descriptions +
entity refs) and asks Sonnet 4.6 (via Claude Code OAuth, $0 on Max) to produce:
- executive_summary_en (2-4 paragraphs, what the doc IS and what it claims)
- executive_summary_pt_br (Brazilian Portuguese version)
- narrative_arc_en (story across pages)
- narrative_arc_pt_br
- central_characters[] (top 3-7 people, their role + arc)
- key_events_timeline[] (date + label + page refs)
- key_locations[] (with significance)
- strategic_significance (why this doc matters to the corpus)
- confidence_band
Output replaces the entire body of wiki/documents/<doc-id>.md. Frontmatter is
preserved + augmented with these synthesis fields.
Usage:
./24-document-synthesis.py --doc-id <id> # one doc smoke test
./24-document-synthesis.py --all # all 116 docs
./24-document-synthesis.py --max 5 # cap for testing
./24-document-synthesis.py --skip-existing # don't redo docs that already have synthesis
"""
from __future__ import annotations
import argparse
import concurrent.futures
import json
import re
import subprocess
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
try:
import yaml
except ImportError:
sys.stderr.write("pip3 install pyyaml\n"); sys.exit(1)
UFO_ROOT = Path("/Users/guto/ufo")
DOCS = UFO_ROOT / "wiki" / "documents"
PAGES_DIR = UFO_ROOT / "wiki" / "pages"
OCR_BASE = UFO_ROOT / "processing" / "ocr"
LOG_PATH = UFO_ROOT / "wiki" / "log.md"
MODEL = "sonnet" # claude-sonnet-4-6 via Claude Code OAuth
MAX_TURNS = 3
TIMEOUT_S = 240
MAX_INPUT_CHARS = 320_000 # cap input size; Sonnet 200K tokens ≈ 800K chars safe
def utc_now_iso() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def read_fm(path: Path) -> tuple[dict, str]:
c = path.read_text(encoding="utf-8")
if not c.startswith("---"):
return {}, c
end = c.find("---", 4)
if end < 0:
return {}, c
try:
fm = yaml.safe_load(c[3:end].strip()) or {}
except yaml.YAMLError:
fm = {}
return fm, c[end + 3:].lstrip("\n")
def write_fm(path: Path, fm: dict, body: str) -> None:
new_yaml = yaml.dump(fm, allow_unicode=True, sort_keys=False, default_flow_style=False)
sep = "\n" if body.startswith("\n") else "\n\n"
path.write_text(f"---\n{new_yaml}---{sep}{body}", encoding="utf-8")
def assemble_doc_payload(doc_id: str) -> tuple[str, dict] | None:
"""Read all page OCRs + vision descriptions + entity refs and concatenate
into a single payload string for Sonnet. Returns (payload, meta)."""
doc_md = DOCS / f"{doc_id}.md"
if not doc_md.exists():
return None
doc_fm, _ = read_fm(doc_md)
pages_dir = PAGES_DIR / doc_id
page_files = sorted(pages_dir.glob("p*.md"))
if not page_files:
return None
lines: list[str] = []
lines.append(f"# DOCUMENT: {doc_id}")
lines.append(f"# Canonical title: {doc_fm.get('canonical_title', doc_id)}")
lines.append(f"# Collection: {doc_fm.get('collection', '?')}")
lines.append(f"# Total pages: {len(page_files)}")
lines.append("")
for pf in page_files:
page_fm, _ = read_fm(pf)
page_num = page_fm.get("page_number", "?")
lines.append(f"\n===== PAGE {page_num:>3} ({pf.stem}) =====")
if page_fm.get("page_type"):
lines.append(f" page_type: {page_fm['page_type']}")
if page_fm.get("vision_description"):
lines.append(f" vision (EN): {page_fm['vision_description']}")
if isinstance(page_fm.get("entities_extracted"), dict):
ee = page_fm["entities_extracted"]
for cls in ("people", "organizations", "locations", "events", "uap_objects"):
names = [(e.get("name") or e.get("label")) for e in (ee.get(cls) or []) if isinstance(e, dict)]
names = [n for n in names if n]
if names:
lines.append(f" {cls}: {', '.join(names[:15])}")
# OCR (truncate per-page for very large docs)
padded = f"{int(page_num):03d}" if isinstance(page_num, int) or (isinstance(page_num, str) and page_num.isdigit()) else "001"
ocr_path = OCR_BASE / doc_id / f"p-{padded}.txt"
if ocr_path.exists():
ocr = ocr_path.read_text(encoding="utf-8", errors="replace")
if len(ocr) > 4000:
ocr = ocr[:4000] + f"\n[…page truncated, {len(ocr)} chars total]"
lines.append(" OCR:")
for line in ocr.split("\n")[:120]:
lines.append(f" {line}")
payload = "\n".join(lines)
if len(payload) > MAX_INPUT_CHARS:
payload = payload[:MAX_INPUT_CHARS] + f"\n\n[…truncated to {MAX_INPUT_CHARS} chars; doc has more]"
meta = {
"doc_id": doc_id,
"page_count": len(page_files),
"input_chars": len(payload),
}
return payload, meta
PROMPT_TEMPLATE = """You are Sherlock, lead investigator of The Disclosure Bureau. Your task: produce a coherent SYNTHESIS of an entire US Department of War declassified UAP/UFO document by reading ALL its pages together.
The dump below contains, for each page: vision description, extracted entities, and OCR text. Treat this as ONE coherent document, not a list of pages. Build a narrative arc, identify central characters, key events, themes.
DOCUMENT DUMP:
========================================
{payload}
========================================
Output ONE JSON object (no markdown fence, no preamble). Schema:
{{
"executive_summary_en": "2-4 paragraphs in English. What this document IS, what it claims, who produced it, what it covers, why it exists. CITE specific page numbers like (p005, p023). Use Tetlock confidence bands sparingly: 'high', 'medium', 'low', 'speculation'.",
"executive_summary_pt_br": "Mesmo conteúdo em português brasileiro (pt-br, NÃO European). Preserve UTF-8 accents (ç, ã, é etc.). Mantenha citações verbatim do documento (em inglês) sem traduzir.",
"narrative_arc_en": "3-6 paragraphs telling the story the document tells, as it unfolds across pages. Reference page numbers as (p007). Stay grounded — only what's actually in the document.",
"narrative_arc_pt_br": "Mesmo em pt-br.",
"central_characters": [
{{"name": "Canonical full name", "role": "what they do in this doc", "arc": "how they appear across pages", "pages": "p001, p007-p012"}}
],
"key_events_timeline": [
{{"date": "YYYY-MM-DD or YYYY or 'undated'", "event": "short factual description", "pages": "p005"}}
],
"key_locations": [
{{"name": "Location name", "significance": "why it matters in this doc", "pages": "p007"}}
],
"uap_objects_described": [
{{"shape": "...", "color": "...", "behavior": "key observed behaviors", "page": "p007"}}
],
"strategic_significance": "1 paragraph English. Why this document matters for understanding the wider UAP archive — what unique evidence or claim it brings, what it confirms or contradicts of other reports. Use confidence_band.",
"strategic_significance_pt_br": "Mesmo em pt-br.",
"synthesis_confidence_band": "high | medium | low | speculation",
"synthesis_caveats": "any notes about gaps, low OCR quality, redaction extent, contradictions you noticed"
}}
Rules:
- All entity names and verbatim quotes stay in their ORIGINAL language.
- Brazilian Portuguese (NOT European). Preserve UTF-8.
- Cite page numbers like (p007) so readers can verify.
- If document is genuinely thin (e.g., just a memo without much content), say so explicitly in executive_summary.
- Output ONLY the JSON. No fence, no preamble."""
def call_sonnet(payload: str) -> dict:
prompt = PROMPT_TEMPLATE.format(payload=payload)
def _call():
return subprocess.run(
["claude", "-p", "--model", MODEL,
"--output-format", "json", "--max-turns", str(MAX_TURNS),
"--", prompt],
capture_output=True, text=True, timeout=TIMEOUT_S + 30, check=False,
)
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as ex:
future = ex.submit(_call)
try:
res = future.result(timeout=TIMEOUT_S)
except concurrent.futures.TimeoutError:
raise RuntimeError(f"sonnet hung >{TIMEOUT_S}s")
if res.returncode != 0:
raise RuntimeError(f"claude rc={res.returncode}: {res.stderr[-500:]}")
cli = json.loads(res.stdout)
if cli.get("is_error"):
raise RuntimeError(f"claude error: {cli.get('result', '')[:300]}")
text = (cli.get("result") or "").strip()
text = re.sub(r"^```(?:json)?\s*", "", text)
text = re.sub(r"\s*```$", "", text)
# Robust JSON parse with brace balancing fallback
try:
return json.loads(text)
except json.JSONDecodeError:
start = text.find("{")
if start < 0:
raise
depth = 0
for i in range(start, len(text)):
if text[i] == "{": depth += 1
elif text[i] == "}":
depth -= 1
if depth == 0:
return json.loads(text[start:i + 1])
raise
def render_body(synthesis: dict) -> str:
"""Build the markdown body from the JSON synthesis."""
lines = []
lines.append(f"# {synthesis.get('executive_summary_en', 'Document Synthesis').split('.', 1)[0][:80]}")
lines.append("")
band = synthesis.get("synthesis_confidence_band", "")
if band:
lines.append(f"> **Synthesis confidence: `{band}`**")
lines.append("")
if synthesis.get("synthesis_caveats"):
lines.append(f"> ⚠ Caveats: {synthesis['synthesis_caveats']}")
lines.append("")
lines.append("## Executive Summary (EN)")
lines.append("")
lines.append(synthesis.get("executive_summary_en", "_no summary_"))
lines.append("")
lines.append("## Sumário Executivo (PT-BR)")
lines.append("")
lines.append(synthesis.get("executive_summary_pt_br", "_sem sumário_"))
lines.append("")
lines.append("## Narrative Arc (EN)")
lines.append("")
lines.append(synthesis.get("narrative_arc_en", "_no narrative_"))
lines.append("")
lines.append("## Arco Narrativo (PT-BR)")
lines.append("")
lines.append(synthesis.get("narrative_arc_pt_br", "_sem narrativa_"))
lines.append("")
chars = synthesis.get("central_characters") or []
if chars:
lines.append("## Central Characters")
lines.append("")
for c in chars:
lines.append(f"### {c.get('name', '?')}")
if c.get("role"): lines.append(f"- **Role**: {c['role']}")
if c.get("arc"): lines.append(f"- **Arc**: {c['arc']}")
if c.get("pages"): lines.append(f"- **Pages**: `{c['pages']}`")
lines.append("")
events = synthesis.get("key_events_timeline") or []
if events:
lines.append("## Key Events Timeline")
lines.append("")
lines.append("| Date | Event | Pages |")
lines.append("|---|---|---|")
for e in events:
lines.append(f"| {e.get('date', '?')} | {e.get('event', '?')} | `{e.get('pages', '')}` |")
lines.append("")
locs = synthesis.get("key_locations") or []
if locs:
lines.append("## Key Locations")
lines.append("")
for l in locs:
lines.append(f"- **{l.get('name', '?')}** ({l.get('pages', '')}): {l.get('significance', '')}")
lines.append("")
uaps = synthesis.get("uap_objects_described") or []
if uaps:
lines.append("## UAP Objects Described")
lines.append("")
for u in uaps:
lines.append(f"- **{u.get('shape', '?')} ({u.get('color', '?')})** on `{u.get('page', '')}`: {u.get('behavior', '')}")
lines.append("")
lines.append("## Strategic Significance")
lines.append("")
lines.append(synthesis.get("strategic_significance", "_no assessment_"))
lines.append("")
lines.append("## Significância Estratégica (PT-BR)")
lines.append("")
lines.append(synthesis.get("strategic_significance_pt_br", "_sem avaliação_"))
lines.append("")
return "\n".join(lines)
def process_doc(doc_id: str, *, skip_existing: bool) -> str:
doc_path = DOCS / f"{doc_id}.md"
if not doc_path.exists():
return "missing"
fm, body = read_fm(doc_path)
if skip_existing and fm.get("synthesis_model"):
return "skip-existing"
print(f"{doc_id} ({fm.get('page_count', '?')} pages)", flush=True)
assembled = assemble_doc_payload(doc_id)
if not assembled:
return "no-payload"
payload, meta = assembled
print(f" input: {meta['input_chars']} chars from {meta['page_count']} pages", flush=True)
t0 = time.time()
try:
synthesis = call_sonnet(payload)
except Exception as e:
print(f" ✗ Sonnet failed: {type(e).__name__}: {e}", flush=True)
return "error"
dt = time.time() - t0
new_body = render_body(synthesis)
fm["synthesis_model"] = "claude-sonnet-4-6"
fm["synthesis_at"] = utc_now_iso()
fm["synthesis_confidence_band"] = synthesis.get("synthesis_confidence_band")
fm["central_characters_count"] = len(synthesis.get("central_characters") or [])
fm["key_events_count"] = len(synthesis.get("key_events_timeline") or [])
write_fm(doc_path, fm, new_body)
print(f" ✓ wrote synthesis ({dt:.1f}s)", flush=True)
return "ok"
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--doc-id")
ap.add_argument("--all", action="store_true")
ap.add_argument("--max", type=int, default=0)
ap.add_argument("--skip-existing", action="store_true")
ap.add_argument("--workers", type=int, default=1, help="parallel workers (default 1; Max 20x rate-limits Sonnet)")
args = ap.parse_args()
if args.doc_id:
targets = [args.doc_id]
elif args.all:
targets = sorted(p.stem for p in DOCS.glob("*.md"))
else:
ap.error("provide --doc-id or --all")
if args.max:
targets = targets[:args.max]
print(f"Synthesizing {len(targets)} document(s) with claude-sonnet-4-6")
stats = {"ok": 0, "error": 0, "skip-existing": 0, "no-payload": 0, "missing": 0}
if args.workers <= 1:
for d in targets:
r = process_doc(d, skip_existing=args.skip_existing)
stats[r] = stats.get(r, 0) + 1
else:
with concurrent.futures.ThreadPoolExecutor(max_workers=args.workers) as pool:
futs = {pool.submit(process_doc, d, skip_existing=args.skip_existing): d for d in targets}
for fut in concurrent.futures.as_completed(futs):
try:
r = fut.result()
stats[r] = stats.get(r, 0) + 1
except Exception as e:
sys.stderr.write(f"{futs[fut]}: {e}\n")
stats["error"] += 1
print(f"\nDone. {stats}")
if stats.get("ok"):
with open(LOG_PATH, "a", encoding="utf-8") as fh:
fh.write(
f"\n## {utc_now_iso()} — DOCUMENT SYNTHESIS (Sonnet 4.6)\n"
f"- script: scripts/24-document-synthesis.py\n"
f"- docs_synthesized: {stats['ok']}\n"
f"- errors: {stats.get('error', 0)}\n"
)
if __name__ == "__main__":
main()