disclosure-bureau/scripts/34-generate-doc-pitches.py

379 lines
13 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""
34-generate-doc-pitches.py Generate Johnny Harris-style "enthusiast pitches"
(50-150 words, PT-BR + EN) for every document in wiki/documents/*.md.
Each pitch is injected into the doc's frontmatter as:
enthusiast_pitch_pt_br: "..."
enthusiast_pitch_en: "..."
Style guide (encoded in the prompt):
- Mystery hook opening (date + place)
- Concrete sensory details, real witness names with credentials
- Staccato pacing, repetition for emphasis
- Cliffhanger question at the end
- Length adapts to doc richness: single dense case ~140w · multi-case focus on
pattern + zoom on one · sparse doc 50-80w
Pattern: each doc in its OWN `claude -p` subprocess (clean context). Workers
parallel for throughput. Idempotent: skips docs that already have pitch.
Usage:
./34-generate-doc-pitches.py # all docs missing pitch
./34-generate-doc-pitches.py --workers 4
./34-generate-doc-pitches.py --doc-id doc-X # single doc
./34-generate-doc-pitches.py --force # regenerate even if exists
./34-generate-doc-pitches.py --model haiku # cheaper, faster
"""
from __future__ import annotations
import argparse
import json
import os
import re
import subprocess
import sys
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from threading import Lock
try:
import yaml
except ImportError:
sys.stderr.write("pip3 install pyyaml\n"); sys.exit(1)
UFO_ROOT = Path("/Users/guto/ufo")
WIKI_DOCS = UFO_ROOT / "wiki" / "documents"
LOG_DIR = UFO_ROOT / "raw" / "_pitch-generation"
LOG_DIR.mkdir(parents=True, exist_ok=True)
PROGRESS_LOG = LOG_DIR / "progress.jsonl"
progress_lock = Lock()
quota_exhausted = False # detected globally → stops new spawns
SYSTEM_PROMPT = """You are writing in the voice of JOHNNY HARRIS — independent journalist, Vox/YouTube-style visual storyteller. Brazilian Portuguese (pt-br, NOT European). Companion piece for an English version.
Your goal: produce a SHORT (50-150 words) "enthusiast pitch" for a single declassified UAP/UFO document, designed to hook a curious lay reader on a website card.
STYLE RULES:
1. Open with a MYSTERY HOOK date + place, concrete and grounded ("24 de abril de 1964, fim de tarde. Socorro, Novo México.")
2. Use staccato sentences. Forward motion. Plain language, no jargon.
3. Anchor in specific details: witness names + credentials, altitudes, coordinates, formal stamps (CONFIDENTIAL/RESTRICTED), unit numbers.
4. Repetition for emphasis. ("Sem som. Sem rastro.")
5. End with a CLIFFHANGER question or final stamp/document marker. Never a summary sentence.
6. Bold key facts with **markdown** if helpful (3-5 max).
7. Preserve verbatim quotes from the source in English when they're vivid (e.g., RESTRICTED, callsigns, military jargon).
LENGTH ADAPTS TO DOC RICHNESS:
- Sparse doc / form fragment: 50-80 words
- One dense case: 100-150 words
- Multi-case doc: lead with scale ("100 incidentes em uma pasta"), zoom on ONE vivid case, signal the rest ("e há mais 99 desses"), pattern recognition, final question.
OUTPUT FORMAT return EXACTLY this JSON, nothing else:
{
"pitch_pt_br": "...",
"pitch_en": "..."
}
Both versions should hit roughly the same word count. PT-BR is the primary; EN is a faithful adaptation, NOT a literal translation."""
USER_PROMPT_TEMPLATE = """Generate the enthusiast pitch for this declassified document:
DOC ID: {doc_id}
TITLE: {canonical_title}
COLLECTION: {collection}
PAGES: {page_count}
CLASSIFICATION: {classification}
DOCUMENT BODY (truncated to first 6000 chars focus on substantive content):
{body}
Return the JSON now."""
def utc_iso() -> str:
from datetime import datetime, timezone
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def append_jsonl(path: Path, record: dict) -> None:
with progress_lock:
with path.open("a", encoding="utf-8") as fh:
fh.write(json.dumps(record, ensure_ascii=False) + "\n")
def read_doc(doc_path: Path) -> tuple[dict, str, str]:
raw = doc_path.read_text(encoding="utf-8")
if not raw.startswith("---"):
return {}, "", raw
end = raw.find("---", 4)
if end < 0:
return {}, "", raw
fm_text = raw[3:end].strip()
body = raw[end + 3 :].lstrip("\n")
try:
fm = yaml.safe_load(fm_text) or {}
except yaml.YAMLError:
fm = {}
return fm, fm_text, body
def write_doc(doc_path: Path, fm: dict, body: str) -> None:
new_yaml = yaml.safe_dump(fm, sort_keys=False, allow_unicode=True, width=120)
new_raw = f"---\n{new_yaml}---\n{body}"
doc_path.write_text(new_raw, encoding="utf-8")
def call_claude(doc_id: str, fm: dict, body: str, model: str, timeout_s: int) -> tuple[bool, dict]:
"""Call `claude -p` for ONE pitch. Returns (success, payload)."""
global quota_exhausted
if quota_exhausted:
return False, {"error": "quota_exhausted_early_abort"}
prompt = USER_PROMPT_TEMPLATE.format(
doc_id=doc_id,
canonical_title=fm.get("canonical_title") or doc_id,
collection=fm.get("collection") or "",
page_count=fm.get("page_count") or "?",
classification=fm.get("highest_classification") or fm.get("classification") or "",
body=body[:6000],
)
cmd = [
"claude", "-p",
"--model", model,
"--output-format", "json",
"--max-turns", "2",
"--system-prompt", SYSTEM_PROMPT,
"--",
prompt,
]
try:
proc = subprocess.run(
cmd,
capture_output=True,
text=True,
env={**os.environ},
check=False,
timeout=timeout_s,
)
except subprocess.TimeoutExpired:
return False, {"error": "timeout", "wall_seconds": timeout_s}
if proc.returncode != 0:
excerpt = (proc.stdout or "")[-500:]
if "monthly usage limit" in excerpt.lower() or "usage limit" in excerpt.lower():
quota_exhausted = True
return False, {"error": "quota_exhausted", "result_excerpt": excerpt}
return False, {"error": "rc_nonzero", "rc": proc.returncode, "stderr": (proc.stderr or "")[-500:]}
try:
cli = json.loads(proc.stdout)
except json.JSONDecodeError:
return False, {"error": "cli_json_parse", "raw": proc.stdout[-500:]}
result = cli.get("result", "")
if not result:
return False, {"error": "empty_result", "cli": cli}
# Try multiple strategies to extract the pitches robustly
payload = None
# Strategy 1: try parsing the whole result as JSON
try:
payload = json.loads(result.strip())
except json.JSONDecodeError:
pass
# Strategy 2: regex for the two fields directly (handles unescaped chars in values)
if not payload:
# Match: "pitch_pt_br": "<content until next ", " ... pitch_en or end of object>"
# We use a more flexible approach: split on the field names
pt_match = re.search(
r'"pitch_pt_br"\s*:\s*"((?:[^"\\]|\\.)*)"\s*,\s*"pitch_en"',
result, re.DOTALL
)
en_match = re.search(
r'"pitch_en"\s*:\s*"((?:[^"\\]|\\.)*)"\s*[},]',
result, re.DOTALL
)
if pt_match and en_match:
pt = pt_match.group(1).replace('\\"', '"').replace('\\n', '\n').replace('\\\\', '\\')
en = en_match.group(1).replace('\\"', '"').replace('\\n', '\n').replace('\\\\', '\\')
payload = {"pitch_pt_br": pt, "pitch_en": en}
# Strategy 3: balanced-brace extraction
if not payload:
try:
start = result.index("{")
depth = 0
for i in range(start, len(result)):
if result[i] == "{": depth += 1
elif result[i] == "}":
depth -= 1
if depth == 0:
try:
payload = json.loads(result[start : i + 1])
except json.JSONDecodeError:
pass
break
except ValueError:
pass
if not payload:
return False, {"error": "no_extractable_json", "result_excerpt": result[:600]}
if "pitch_pt_br" not in payload or "pitch_en" not in payload:
return False, {"error": "missing_fields", "payload": payload}
return True, {
"pitch_pt_br": payload["pitch_pt_br"].strip(),
"pitch_en": payload["pitch_en"].strip(),
"cost_usd": cli.get("total_cost_usd"),
"num_turns": cli.get("num_turns"),
"usage": cli.get("usage"),
}
def word_count(text: str) -> int:
return len([w for w in re.split(r"\s+", text) if w])
def process_doc(doc_id: str, force: bool, model: str, timeout_s: int) -> dict:
"""Generate + inject pitch for ONE doc."""
doc_path = WIKI_DOCS / f"{doc_id}.md"
if not doc_path.exists():
return {"doc_id": doc_id, "success": False, "error": "doc_not_found"}
fm, _, body = read_doc(doc_path)
if not force and fm.get("enthusiast_pitch_pt_br") and fm.get("enthusiast_pitch_en"):
return {"doc_id": doc_id, "success": True, "skipped": True, "reason": "already_has_pitch"}
t0 = time.time()
ok, result = call_claude(doc_id, fm, body, model, timeout_s)
wall = round(time.time() - t0, 1)
rec = {
"doc_id": doc_id,
"started_at": utc_iso(),
"wall_seconds": wall,
"model": model,
}
if not ok:
rec.update({"success": False, **result})
append_jsonl(PROGRESS_LOG, rec)
return rec
pt = result["pitch_pt_br"]
en = result["pitch_en"]
rec.update({
"success": True,
"pt_words": word_count(pt),
"en_words": word_count(en),
"cost_usd": result.get("cost_usd"),
})
# Validate word count
if not (40 <= word_count(pt) <= 200) or not (40 <= word_count(en) <= 200):
rec["warning"] = f"word_count_oob pt={word_count(pt)} en={word_count(en)}"
# Inject into frontmatter
fm["enthusiast_pitch_pt_br"] = pt
fm["enthusiast_pitch_en"] = en
fm["enthusiast_pitch_generated_at"] = utc_iso()
fm["enthusiast_pitch_model"] = model
write_doc(doc_path, fm, body)
append_jsonl(PROGRESS_LOG, rec)
return rec
def list_target_docs(only: str | None) -> list[str]:
if only:
return [only]
docs: list[str] = []
for p in sorted(WIKI_DOCS.glob("*.md")):
docs.append(p.stem)
return docs
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--workers", type=int, default=4)
ap.add_argument("--doc-id", default=None)
ap.add_argument("--force", action="store_true")
ap.add_argument("--model", default="sonnet", choices=["sonnet", "haiku"])
ap.add_argument("--timeout-per-doc", type=int, default=180)
ap.add_argument("--limit", type=int, default=None, help="Smoke test: process at most N")
args = ap.parse_args()
docs = list_target_docs(args.doc_id)
if args.limit:
docs = docs[: args.limit]
if not args.force:
# Skip docs already done
keep: list[str] = []
for d in docs:
fm, _, _ = read_doc(WIKI_DOCS / f"{d}.md")
if not (fm.get("enthusiast_pitch_pt_br") and fm.get("enthusiast_pitch_en")):
keep.append(d)
skipped = len(docs) - len(keep)
docs = keep
else:
skipped = 0
print(f"=" * 70)
print(f" ENTHUSIAST PITCH GENERATOR — {len(docs)} docs queued, {skipped} already done")
print(f" workers: {args.workers} · model: {args.model} · timeout: {args.timeout_per_doc}s/doc")
print(f" started: {utc_iso()}")
print(f"=" * 70)
sys.stdout.flush()
t0 = time.time()
ok = err = 0
total_cost = 0.0
with ThreadPoolExecutor(max_workers=args.workers) as ex:
futures = {ex.submit(process_doc, d, args.force, args.model, args.timeout_per_doc): d for d in docs}
for fut in as_completed(futures):
doc_id = futures[fut]
try:
r = fut.result()
except Exception as e:
r = {"doc_id": doc_id, "success": False, "exception": str(e)}
if r.get("success"):
ok += 1
total_cost += r.get("cost_usd") or 0
marker = "" if r.get("skipped") else ""
wc = f"pt={r.get('pt_words','?')}w en={r.get('en_words','?')}w" if not r.get("skipped") else "(cached)"
print(f" [{ok+err}/{len(docs)}] {marker} {doc_id[:55]} · {wc} · ${r.get('cost_usd') or 0:.3f}")
else:
err += 1
marker = "💸" if r.get("error") == "quota_exhausted" else ""
print(f" [{ok+err}/{len(docs)}] {marker} {doc_id[:55]} · {r.get('error')}")
sys.stdout.flush()
if quota_exhausted:
# Cancel pending — early abort
for f in futures:
if not f.done():
f.cancel()
print("\n ⚠ QUOTA EXHAUSTED — aborting. Re-run later.")
break
print(f"\n{'=' * 70}")
print(f" DONE — {ok}/{len(docs)} succeeded · ${total_cost:.2f} · {round(time.time() - t0, 1)}s")
print(f"{'=' * 70}")
if __name__ == "__main__":
main()