#!/usr/bin/env python3 """ 34-generate-doc-pitches.py — Generate Johnny Harris-style "enthusiast pitches" (50-150 words, PT-BR + EN) for every document in wiki/documents/*.md. Each pitch is injected into the doc's frontmatter as: enthusiast_pitch_pt_br: "..." enthusiast_pitch_en: "..." Style guide (encoded in the prompt): - Mystery hook opening (date + place) - Concrete sensory details, real witness names with credentials - Staccato pacing, repetition for emphasis - Cliffhanger question at the end - Length adapts to doc richness: single dense case → ~140w · multi-case → focus on pattern + zoom on one · sparse doc → 50-80w Pattern: each doc in its OWN `claude -p` subprocess (clean context). Workers parallel for throughput. Idempotent: skips docs that already have pitch. Usage: ./34-generate-doc-pitches.py # all docs missing pitch ./34-generate-doc-pitches.py --workers 4 ./34-generate-doc-pitches.py --doc-id doc-X # single doc ./34-generate-doc-pitches.py --force # regenerate even if exists ./34-generate-doc-pitches.py --model haiku # cheaper, faster """ from __future__ import annotations import argparse import json import os import re import subprocess import sys import time from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path from threading import Lock try: import yaml except ImportError: sys.stderr.write("pip3 install pyyaml\n"); sys.exit(1) UFO_ROOT = Path("/Users/guto/ufo") WIKI_DOCS = UFO_ROOT / "wiki" / "documents" LOG_DIR = UFO_ROOT / "raw" / "_pitch-generation" LOG_DIR.mkdir(parents=True, exist_ok=True) PROGRESS_LOG = LOG_DIR / "progress.jsonl" progress_lock = Lock() quota_exhausted = False # detected globally → stops new spawns SYSTEM_PROMPT = """You are writing in the voice of JOHNNY HARRIS — independent journalist, Vox/YouTube-style visual storyteller. Brazilian Portuguese (pt-br, NOT European). Companion piece for an English version. Your goal: produce a SHORT (50-150 words) "enthusiast pitch" for a single declassified UAP/UFO document, designed to hook a curious lay reader on a website card. STYLE RULES: 1. Open with a MYSTERY HOOK — date + place, concrete and grounded ("24 de abril de 1964, fim de tarde. Socorro, Novo México.") 2. Use staccato sentences. Forward motion. Plain language, no jargon. 3. Anchor in specific details: witness names + credentials, altitudes, coordinates, formal stamps (CONFIDENTIAL/RESTRICTED), unit numbers. 4. Repetition for emphasis. ("Sem som. Sem rastro.") 5. End with a CLIFFHANGER question or final stamp/document marker. Never a summary sentence. 6. Bold key facts with **markdown** if helpful (3-5 max). 7. Preserve verbatim quotes from the source in English when they're vivid (e.g., RESTRICTED, callsigns, military jargon). LENGTH ADAPTS TO DOC RICHNESS: - Sparse doc / form fragment: 50-80 words - One dense case: 100-150 words - Multi-case doc: lead with scale ("100 incidentes em uma pasta"), zoom on ONE vivid case, signal the rest ("e há mais 99 desses"), pattern recognition, final question. OUTPUT FORMAT — return EXACTLY this JSON, nothing else: { "pitch_pt_br": "...", "pitch_en": "..." } Both versions should hit roughly the same word count. PT-BR is the primary; EN is a faithful adaptation, NOT a literal translation.""" USER_PROMPT_TEMPLATE = """Generate the enthusiast pitch for this declassified document: DOC ID: {doc_id} TITLE: {canonical_title} COLLECTION: {collection} PAGES: {page_count} CLASSIFICATION: {classification} DOCUMENT BODY (truncated to first 6000 chars — focus on substantive content): {body} Return the JSON now.""" def utc_iso() -> str: from datetime import datetime, timezone return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") def append_jsonl(path: Path, record: dict) -> None: with progress_lock: with path.open("a", encoding="utf-8") as fh: fh.write(json.dumps(record, ensure_ascii=False) + "\n") def read_doc(doc_path: Path) -> tuple[dict, str, str]: raw = doc_path.read_text(encoding="utf-8") if not raw.startswith("---"): return {}, "", raw end = raw.find("---", 4) if end < 0: return {}, "", raw fm_text = raw[3:end].strip() body = raw[end + 3 :].lstrip("\n") try: fm = yaml.safe_load(fm_text) or {} except yaml.YAMLError: fm = {} return fm, fm_text, body def write_doc(doc_path: Path, fm: dict, body: str) -> None: new_yaml = yaml.safe_dump(fm, sort_keys=False, allow_unicode=True, width=120) new_raw = f"---\n{new_yaml}---\n{body}" doc_path.write_text(new_raw, encoding="utf-8") def call_claude(doc_id: str, fm: dict, body: str, model: str, timeout_s: int) -> tuple[bool, dict]: """Call `claude -p` for ONE pitch. Returns (success, payload).""" global quota_exhausted if quota_exhausted: return False, {"error": "quota_exhausted_early_abort"} prompt = USER_PROMPT_TEMPLATE.format( doc_id=doc_id, canonical_title=fm.get("canonical_title") or doc_id, collection=fm.get("collection") or "—", page_count=fm.get("page_count") or "?", classification=fm.get("highest_classification") or fm.get("classification") or "—", body=body[:6000], ) cmd = [ "claude", "-p", "--model", model, "--output-format", "json", "--max-turns", "2", "--system-prompt", SYSTEM_PROMPT, "--", prompt, ] try: proc = subprocess.run( cmd, capture_output=True, text=True, env={**os.environ}, check=False, timeout=timeout_s, ) except subprocess.TimeoutExpired: return False, {"error": "timeout", "wall_seconds": timeout_s} if proc.returncode != 0: excerpt = (proc.stdout or "")[-500:] if "monthly usage limit" in excerpt.lower() or "usage limit" in excerpt.lower(): quota_exhausted = True return False, {"error": "quota_exhausted", "result_excerpt": excerpt} return False, {"error": "rc_nonzero", "rc": proc.returncode, "stderr": (proc.stderr or "")[-500:]} try: cli = json.loads(proc.stdout) except json.JSONDecodeError: return False, {"error": "cli_json_parse", "raw": proc.stdout[-500:]} result = cli.get("result", "") if not result: return False, {"error": "empty_result", "cli": cli} # Try multiple strategies to extract the pitches robustly payload = None # Strategy 1: try parsing the whole result as JSON try: payload = json.loads(result.strip()) except json.JSONDecodeError: pass # Strategy 2: regex for the two fields directly (handles unescaped chars in values) if not payload: # Match: "pitch_pt_br": "" # We use a more flexible approach: split on the field names pt_match = re.search( r'"pitch_pt_br"\s*:\s*"((?:[^"\\]|\\.)*)"\s*,\s*"pitch_en"', result, re.DOTALL ) en_match = re.search( r'"pitch_en"\s*:\s*"((?:[^"\\]|\\.)*)"\s*[},]', result, re.DOTALL ) if pt_match and en_match: pt = pt_match.group(1).replace('\\"', '"').replace('\\n', '\n').replace('\\\\', '\\') en = en_match.group(1).replace('\\"', '"').replace('\\n', '\n').replace('\\\\', '\\') payload = {"pitch_pt_br": pt, "pitch_en": en} # Strategy 3: balanced-brace extraction if not payload: try: start = result.index("{") depth = 0 for i in range(start, len(result)): if result[i] == "{": depth += 1 elif result[i] == "}": depth -= 1 if depth == 0: try: payload = json.loads(result[start : i + 1]) except json.JSONDecodeError: pass break except ValueError: pass if not payload: return False, {"error": "no_extractable_json", "result_excerpt": result[:600]} if "pitch_pt_br" not in payload or "pitch_en" not in payload: return False, {"error": "missing_fields", "payload": payload} return True, { "pitch_pt_br": payload["pitch_pt_br"].strip(), "pitch_en": payload["pitch_en"].strip(), "cost_usd": cli.get("total_cost_usd"), "num_turns": cli.get("num_turns"), "usage": cli.get("usage"), } def word_count(text: str) -> int: return len([w for w in re.split(r"\s+", text) if w]) def process_doc(doc_id: str, force: bool, model: str, timeout_s: int) -> dict: """Generate + inject pitch for ONE doc.""" doc_path = WIKI_DOCS / f"{doc_id}.md" if not doc_path.exists(): return {"doc_id": doc_id, "success": False, "error": "doc_not_found"} fm, _, body = read_doc(doc_path) if not force and fm.get("enthusiast_pitch_pt_br") and fm.get("enthusiast_pitch_en"): return {"doc_id": doc_id, "success": True, "skipped": True, "reason": "already_has_pitch"} t0 = time.time() ok, result = call_claude(doc_id, fm, body, model, timeout_s) wall = round(time.time() - t0, 1) rec = { "doc_id": doc_id, "started_at": utc_iso(), "wall_seconds": wall, "model": model, } if not ok: rec.update({"success": False, **result}) append_jsonl(PROGRESS_LOG, rec) return rec pt = result["pitch_pt_br"] en = result["pitch_en"] rec.update({ "success": True, "pt_words": word_count(pt), "en_words": word_count(en), "cost_usd": result.get("cost_usd"), }) # Validate word count if not (40 <= word_count(pt) <= 200) or not (40 <= word_count(en) <= 200): rec["warning"] = f"word_count_oob pt={word_count(pt)} en={word_count(en)}" # Inject into frontmatter fm["enthusiast_pitch_pt_br"] = pt fm["enthusiast_pitch_en"] = en fm["enthusiast_pitch_generated_at"] = utc_iso() fm["enthusiast_pitch_model"] = model write_doc(doc_path, fm, body) append_jsonl(PROGRESS_LOG, rec) return rec def list_target_docs(only: str | None) -> list[str]: if only: return [only] docs: list[str] = [] for p in sorted(WIKI_DOCS.glob("*.md")): docs.append(p.stem) return docs def main(): ap = argparse.ArgumentParser() ap.add_argument("--workers", type=int, default=4) ap.add_argument("--doc-id", default=None) ap.add_argument("--force", action="store_true") ap.add_argument("--model", default="sonnet", choices=["sonnet", "haiku"]) ap.add_argument("--timeout-per-doc", type=int, default=180) ap.add_argument("--limit", type=int, default=None, help="Smoke test: process at most N") args = ap.parse_args() docs = list_target_docs(args.doc_id) if args.limit: docs = docs[: args.limit] if not args.force: # Skip docs already done keep: list[str] = [] for d in docs: fm, _, _ = read_doc(WIKI_DOCS / f"{d}.md") if not (fm.get("enthusiast_pitch_pt_br") and fm.get("enthusiast_pitch_en")): keep.append(d) skipped = len(docs) - len(keep) docs = keep else: skipped = 0 print(f"=" * 70) print(f" ENTHUSIAST PITCH GENERATOR — {len(docs)} docs queued, {skipped} already done") print(f" workers: {args.workers} · model: {args.model} · timeout: {args.timeout_per_doc}s/doc") print(f" started: {utc_iso()}") print(f"=" * 70) sys.stdout.flush() t0 = time.time() ok = err = 0 total_cost = 0.0 with ThreadPoolExecutor(max_workers=args.workers) as ex: futures = {ex.submit(process_doc, d, args.force, args.model, args.timeout_per_doc): d for d in docs} for fut in as_completed(futures): doc_id = futures[fut] try: r = fut.result() except Exception as e: r = {"doc_id": doc_id, "success": False, "exception": str(e)} if r.get("success"): ok += 1 total_cost += r.get("cost_usd") or 0 marker = "⊘" if r.get("skipped") else "✓" wc = f"pt={r.get('pt_words','?')}w en={r.get('en_words','?')}w" if not r.get("skipped") else "(cached)" print(f" [{ok+err}/{len(docs)}] {marker} {doc_id[:55]} · {wc} · ${r.get('cost_usd') or 0:.3f}") else: err += 1 marker = "💸" if r.get("error") == "quota_exhausted" else "✗" print(f" [{ok+err}/{len(docs)}] {marker} {doc_id[:55]} · {r.get('error')}") sys.stdout.flush() if quota_exhausted: # Cancel pending — early abort for f in futures: if not f.done(): f.cancel() print("\n ⚠ QUOTA EXHAUSTED — aborting. Re-run later.") break print(f"\n{'=' * 70}") print(f" DONE — {ok}/{len(docs)} succeeded · ${total_cost:.2f} · {round(time.time() - t0, 1)}s") print(f"{'=' * 70}") if __name__ == "__main__": main()