disclosure-bureau/scripts/34-generate-doc-pitches.py

378 lines
13 KiB
Python
Executable file

#!/usr/bin/env python3
"""
34-generate-doc-pitches.py — Generate Johnny Harris-style "enthusiast pitches"
(50-150 words, PT-BR + EN) for every document in wiki/documents/*.md.
Each pitch is injected into the doc's frontmatter as:
enthusiast_pitch_pt_br: "..."
enthusiast_pitch_en: "..."
Style guide (encoded in the prompt):
- Mystery hook opening (date + place)
- Concrete sensory details, real witness names with credentials
- Staccato pacing, repetition for emphasis
- Cliffhanger question at the end
- Length adapts to doc richness: single dense case → ~140w · multi-case → focus on
pattern + zoom on one · sparse doc → 50-80w
Pattern: each doc in its OWN `claude -p` subprocess (clean context). Workers
parallel for throughput. Idempotent: skips docs that already have pitch.
Usage:
./34-generate-doc-pitches.py # all docs missing pitch
./34-generate-doc-pitches.py --workers 4
./34-generate-doc-pitches.py --doc-id doc-X # single doc
./34-generate-doc-pitches.py --force # regenerate even if exists
./34-generate-doc-pitches.py --model haiku # cheaper, faster
"""
from __future__ import annotations
import argparse
import json
import os
import re
import subprocess
import sys
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from threading import Lock
try:
import yaml
except ImportError:
sys.stderr.write("pip3 install pyyaml\n"); sys.exit(1)
UFO_ROOT = Path("/Users/guto/ufo")
WIKI_DOCS = UFO_ROOT / "wiki" / "documents"
LOG_DIR = UFO_ROOT / "raw" / "_pitch-generation"
LOG_DIR.mkdir(parents=True, exist_ok=True)
PROGRESS_LOG = LOG_DIR / "progress.jsonl"
progress_lock = Lock()
quota_exhausted = False # detected globally → stops new spawns
SYSTEM_PROMPT = """You are writing in the voice of JOHNNY HARRIS — independent journalist, Vox/YouTube-style visual storyteller. Brazilian Portuguese (pt-br, NOT European). Companion piece for an English version.
Your goal: produce a SHORT (50-150 words) "enthusiast pitch" for a single declassified UAP/UFO document, designed to hook a curious lay reader on a website card.
STYLE RULES:
1. Open with a MYSTERY HOOK — date + place, concrete and grounded ("24 de abril de 1964, fim de tarde. Socorro, Novo México.")
2. Use staccato sentences. Forward motion. Plain language, no jargon.
3. Anchor in specific details: witness names + credentials, altitudes, coordinates, formal stamps (CONFIDENTIAL/RESTRICTED), unit numbers.
4. Repetition for emphasis. ("Sem som. Sem rastro.")
5. End with a CLIFFHANGER question or final stamp/document marker. Never a summary sentence.
6. Bold key facts with **markdown** if helpful (3-5 max).
7. Preserve verbatim quotes from the source in English when they're vivid (e.g., RESTRICTED, callsigns, military jargon).
LENGTH ADAPTS TO DOC RICHNESS:
- Sparse doc / form fragment: 50-80 words
- One dense case: 100-150 words
- Multi-case doc: lead with scale ("100 incidentes em uma pasta"), zoom on ONE vivid case, signal the rest ("e há mais 99 desses"), pattern recognition, final question.
OUTPUT FORMAT — return EXACTLY this JSON, nothing else:
{
"pitch_pt_br": "...",
"pitch_en": "..."
}
Both versions should hit roughly the same word count. PT-BR is the primary; EN is a faithful adaptation, NOT a literal translation."""
USER_PROMPT_TEMPLATE = """Generate the enthusiast pitch for this declassified document:
DOC ID: {doc_id}
TITLE: {canonical_title}
COLLECTION: {collection}
PAGES: {page_count}
CLASSIFICATION: {classification}
DOCUMENT BODY (truncated to first 6000 chars — focus on substantive content):
{body}
Return the JSON now."""
def utc_iso() -> str:
from datetime import datetime, timezone
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def append_jsonl(path: Path, record: dict) -> None:
with progress_lock:
with path.open("a", encoding="utf-8") as fh:
fh.write(json.dumps(record, ensure_ascii=False) + "\n")
def read_doc(doc_path: Path) -> tuple[dict, str, str]:
raw = doc_path.read_text(encoding="utf-8")
if not raw.startswith("---"):
return {}, "", raw
end = raw.find("---", 4)
if end < 0:
return {}, "", raw
fm_text = raw[3:end].strip()
body = raw[end + 3 :].lstrip("\n")
try:
fm = yaml.safe_load(fm_text) or {}
except yaml.YAMLError:
fm = {}
return fm, fm_text, body
def write_doc(doc_path: Path, fm: dict, body: str) -> None:
new_yaml = yaml.safe_dump(fm, sort_keys=False, allow_unicode=True, width=120)
new_raw = f"---\n{new_yaml}---\n{body}"
doc_path.write_text(new_raw, encoding="utf-8")
def call_claude(doc_id: str, fm: dict, body: str, model: str, timeout_s: int) -> tuple[bool, dict]:
"""Call `claude -p` for ONE pitch. Returns (success, payload)."""
global quota_exhausted
if quota_exhausted:
return False, {"error": "quota_exhausted_early_abort"}
prompt = USER_PROMPT_TEMPLATE.format(
doc_id=doc_id,
canonical_title=fm.get("canonical_title") or doc_id,
collection=fm.get("collection") or "",
page_count=fm.get("page_count") or "?",
classification=fm.get("highest_classification") or fm.get("classification") or "",
body=body[:6000],
)
cmd = [
"claude", "-p",
"--model", model,
"--output-format", "json",
"--max-turns", "2",
"--system-prompt", SYSTEM_PROMPT,
"--",
prompt,
]
try:
proc = subprocess.run(
cmd,
capture_output=True,
text=True,
env={**os.environ},
check=False,
timeout=timeout_s,
)
except subprocess.TimeoutExpired:
return False, {"error": "timeout", "wall_seconds": timeout_s}
if proc.returncode != 0:
excerpt = (proc.stdout or "")[-500:]
if "monthly usage limit" in excerpt.lower() or "usage limit" in excerpt.lower():
quota_exhausted = True
return False, {"error": "quota_exhausted", "result_excerpt": excerpt}
return False, {"error": "rc_nonzero", "rc": proc.returncode, "stderr": (proc.stderr or "")[-500:]}
try:
cli = json.loads(proc.stdout)
except json.JSONDecodeError:
return False, {"error": "cli_json_parse", "raw": proc.stdout[-500:]}
result = cli.get("result", "")
if not result:
return False, {"error": "empty_result", "cli": cli}
# Try multiple strategies to extract the pitches robustly
payload = None
# Strategy 1: try parsing the whole result as JSON
try:
payload = json.loads(result.strip())
except json.JSONDecodeError:
pass
# Strategy 2: regex for the two fields directly (handles unescaped chars in values)
if not payload:
# Match: "pitch_pt_br": "<content until next ", " ... pitch_en or end of object>"
# We use a more flexible approach: split on the field names
pt_match = re.search(
r'"pitch_pt_br"\s*:\s*"((?:[^"\\]|\\.)*)"\s*,\s*"pitch_en"',
result, re.DOTALL
)
en_match = re.search(
r'"pitch_en"\s*:\s*"((?:[^"\\]|\\.)*)"\s*[},]',
result, re.DOTALL
)
if pt_match and en_match:
pt = pt_match.group(1).replace('\\"', '"').replace('\\n', '\n').replace('\\\\', '\\')
en = en_match.group(1).replace('\\"', '"').replace('\\n', '\n').replace('\\\\', '\\')
payload = {"pitch_pt_br": pt, "pitch_en": en}
# Strategy 3: balanced-brace extraction
if not payload:
try:
start = result.index("{")
depth = 0
for i in range(start, len(result)):
if result[i] == "{": depth += 1
elif result[i] == "}":
depth -= 1
if depth == 0:
try:
payload = json.loads(result[start : i + 1])
except json.JSONDecodeError:
pass
break
except ValueError:
pass
if not payload:
return False, {"error": "no_extractable_json", "result_excerpt": result[:600]}
if "pitch_pt_br" not in payload or "pitch_en" not in payload:
return False, {"error": "missing_fields", "payload": payload}
return True, {
"pitch_pt_br": payload["pitch_pt_br"].strip(),
"pitch_en": payload["pitch_en"].strip(),
"cost_usd": cli.get("total_cost_usd"),
"num_turns": cli.get("num_turns"),
"usage": cli.get("usage"),
}
def word_count(text: str) -> int:
return len([w for w in re.split(r"\s+", text) if w])
def process_doc(doc_id: str, force: bool, model: str, timeout_s: int) -> dict:
"""Generate + inject pitch for ONE doc."""
doc_path = WIKI_DOCS / f"{doc_id}.md"
if not doc_path.exists():
return {"doc_id": doc_id, "success": False, "error": "doc_not_found"}
fm, _, body = read_doc(doc_path)
if not force and fm.get("enthusiast_pitch_pt_br") and fm.get("enthusiast_pitch_en"):
return {"doc_id": doc_id, "success": True, "skipped": True, "reason": "already_has_pitch"}
t0 = time.time()
ok, result = call_claude(doc_id, fm, body, model, timeout_s)
wall = round(time.time() - t0, 1)
rec = {
"doc_id": doc_id,
"started_at": utc_iso(),
"wall_seconds": wall,
"model": model,
}
if not ok:
rec.update({"success": False, **result})
append_jsonl(PROGRESS_LOG, rec)
return rec
pt = result["pitch_pt_br"]
en = result["pitch_en"]
rec.update({
"success": True,
"pt_words": word_count(pt),
"en_words": word_count(en),
"cost_usd": result.get("cost_usd"),
})
# Validate word count
if not (40 <= word_count(pt) <= 200) or not (40 <= word_count(en) <= 200):
rec["warning"] = f"word_count_oob pt={word_count(pt)} en={word_count(en)}"
# Inject into frontmatter
fm["enthusiast_pitch_pt_br"] = pt
fm["enthusiast_pitch_en"] = en
fm["enthusiast_pitch_generated_at"] = utc_iso()
fm["enthusiast_pitch_model"] = model
write_doc(doc_path, fm, body)
append_jsonl(PROGRESS_LOG, rec)
return rec
def list_target_docs(only: str | None) -> list[str]:
if only:
return [only]
docs: list[str] = []
for p in sorted(WIKI_DOCS.glob("*.md")):
docs.append(p.stem)
return docs
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--workers", type=int, default=4)
ap.add_argument("--doc-id", default=None)
ap.add_argument("--force", action="store_true")
ap.add_argument("--model", default="sonnet", choices=["sonnet", "haiku"])
ap.add_argument("--timeout-per-doc", type=int, default=180)
ap.add_argument("--limit", type=int, default=None, help="Smoke test: process at most N")
args = ap.parse_args()
docs = list_target_docs(args.doc_id)
if args.limit:
docs = docs[: args.limit]
if not args.force:
# Skip docs already done
keep: list[str] = []
for d in docs:
fm, _, _ = read_doc(WIKI_DOCS / f"{d}.md")
if not (fm.get("enthusiast_pitch_pt_br") and fm.get("enthusiast_pitch_en")):
keep.append(d)
skipped = len(docs) - len(keep)
docs = keep
else:
skipped = 0
print(f"=" * 70)
print(f" ENTHUSIAST PITCH GENERATOR — {len(docs)} docs queued, {skipped} already done")
print(f" workers: {args.workers} · model: {args.model} · timeout: {args.timeout_per_doc}s/doc")
print(f" started: {utc_iso()}")
print(f"=" * 70)
sys.stdout.flush()
t0 = time.time()
ok = err = 0
total_cost = 0.0
with ThreadPoolExecutor(max_workers=args.workers) as ex:
futures = {ex.submit(process_doc, d, args.force, args.model, args.timeout_per_doc): d for d in docs}
for fut in as_completed(futures):
doc_id = futures[fut]
try:
r = fut.result()
except Exception as e:
r = {"doc_id": doc_id, "success": False, "exception": str(e)}
if r.get("success"):
ok += 1
total_cost += r.get("cost_usd") or 0
marker = "" if r.get("skipped") else ""
wc = f"pt={r.get('pt_words','?')}w en={r.get('en_words','?')}w" if not r.get("skipped") else "(cached)"
print(f" [{ok+err}/{len(docs)}] {marker} {doc_id[:55]} · {wc} · ${r.get('cost_usd') or 0:.3f}")
else:
err += 1
marker = "💸" if r.get("error") == "quota_exhausted" else ""
print(f" [{ok+err}/{len(docs)}] {marker} {doc_id[:55]} · {r.get('error')}")
sys.stdout.flush()
if quota_exhausted:
# Cancel pending — early abort
for f in futures:
if not f.done():
f.cancel()
print("\n ⚠ QUOTA EXHAUSTED — aborting. Re-run later.")
break
print(f"\n{'=' * 70}")
print(f" DONE — {ok}/{len(docs)} succeeded · ${total_cost:.2f} · {round(time.time() - t0, 1)}s")
print(f"{'=' * 70}")
if __name__ == "__main__":
main()