#!/usr/bin/env python3 """ 28-batch-rebuild-all.py — Batch rebuild ALL declassified UAP/UFO documents into the agentic chunk schema (raw//document.md + chunks/ + _index.json). CRITICAL RULE (user-mandated): each document runs in its OWN fresh `claude -p` subprocess with empty context. Never accumulate multiple docs in one session. Workers parallel at the SUBPROCESS level only — never multi-doc inside one Claude session. Default 2 parallel workers. Usage: ./28-batch-rebuild-all.py # 2 workers, all unbuilt docs ./28-batch-rebuild-all.py --workers 4 ./28-batch-rebuild-all.py --limit 5 # smoke ./28-batch-rebuild-all.py --doc-id doc-X # single doc ./28-batch-rebuild-all.py --resume # skip docs already done ./28-batch-rebuild-all.py --force # rebuild even if exists """ from __future__ import annotations import argparse import json import os import shutil import subprocess import sys import time from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime, timezone from pathlib import Path from threading import Lock UFO_ROOT = Path("/Users/guto/ufo") PNG_ROOT = UFO_ROOT / "processing" / "png" RAW_ROOT = UFO_ROOT / "raw" LOG_DIR = RAW_ROOT / "_batch-rebuild" LOG_DIR.mkdir(parents=True, exist_ok=True) PROGRESS_LOG = LOG_DIR / "progress.jsonl" FAILED_LOG = LOG_DIR / "failed.jsonl" SUMMARY_LOG = LOG_DIR / "summary.json" PROMPT = """Rebuild the document `{doc_id}` into `raw/{doc_id}/document.md` using the `doc-rebuilder` subagent. Constraints: - Process ALL {page_count} pages (p001 .. p{page_count:03d}). - The doc-rebuilder agent spawns `page-rebuilder`, `image-analyst`, and `table-stitcher` subagents in parallel via Task. - Output schema: as defined in .claude/agents/doc-rebuilder.md system prompt. - Target files: - `/Users/guto/ufo/raw/{doc_id}/document.md` (assembled master) - `/Users/guto/ufo/raw/{doc_id}/_index.json` (machine-readable chunk index) - `/Users/guto/ufo/raw/{doc_id}/chunks/c*.md` (one file per chunk) - `/Users/guto/ufo/raw/{doc_id}/images/IMG-c*.png` (cropped image regions) - Set frontmatter `build_approach: "subagents"`. - Bilingual EN + Brazilian PT-BR. Preserve UTF-8 accents. Begin. When done, output a single line of stats: pages_done, chunks_total, images_extracted, tables_stitched, ufo_anomalies, cryptid_anomalies, wall_seconds.""" progress_lock = Lock() quota_exhausted = False # global flag: if Anthropic returns "monthly usage limit", stop the batch def utc_iso() -> str: return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") def append_jsonl(path: Path, record: dict) -> None: with progress_lock: with path.open("a", encoding="utf-8") as fh: fh.write(json.dumps(record, ensure_ascii=False) + "\n") def discover_docs() -> list[tuple[str, int]]: """List all doc_ids with their page count from processing/png/.""" out: list[tuple[str, int]] = [] for doc_dir in sorted(PNG_ROOT.iterdir()): if not doc_dir.is_dir(): continue pages = sorted(doc_dir.glob("p-*.png")) if pages: out.append((doc_dir.name, len(pages))) return out def is_done(doc_id: str) -> bool: """A doc is done if raw/--subagent/{document.md,_index.json,chunks/} all exist.""" archive = RAW_ROOT / f"{doc_id}--subagent" return (archive / "document.md").exists() and (archive / "_index.json").exists() and (archive / "chunks").is_dir() QUOTA_MARKERS = ( "monthly usage limit", "usage limit", "rate limit exceeded", ) def looks_like_quota_error(result_excerpt: str, raw_stdout: str) -> bool: blob = (result_excerpt or "") + " " + (raw_stdout or "") blob = blob.lower() return any(m in blob for m in QUOTA_MARKERS) def rebuild_one(doc_id: str, page_count: int, timeout_s: int) -> dict: """Run ONE `claude -p` subprocess for ONE document. Fresh context.""" global quota_exhausted if quota_exhausted: return { "doc_id": doc_id, "page_count": page_count, "started_at": utc_iso(), "finished_at": utc_iso(), "wall_seconds": 0, "returncode": -3, "timed_out": False, "success": False, "skipped": True, "skip_reason": "quota_exhausted_already_detected", "chunks_count": 0, "images_count": 0, } out_dir = RAW_ROOT / doc_id archive = RAW_ROOT / f"{doc_id}--subagent" # Wipe any half-built state, start clean if out_dir.exists(): shutil.rmtree(out_dir) if archive.exists(): shutil.rmtree(archive) out_dir.mkdir(parents=True, exist_ok=True) (out_dir / "chunks").mkdir(exist_ok=True) (out_dir / "images").mkdir(exist_ok=True) (out_dir / "tables").mkdir(exist_ok=True) prompt = PROMPT.format(doc_id=doc_id, page_count=page_count) cmd = [ "claude", "-p", "--model", "sonnet", "--output-format", "json", "--max-turns", "120", "--allowedTools", "Read,Write,Bash,Task", "--add-dir", str(UFO_ROOT), "--", prompt, ] t0 = time.time() started_at = utc_iso() try: proc = subprocess.run( cmd, capture_output=True, text=True, env={**os.environ}, check=False, timeout=timeout_s, ) timed_out = False except subprocess.TimeoutExpired as e: proc = e timed_out = True wall = round(time.time() - t0, 1) # Parse Claude CLI JSON output cli: dict = {} raw_stdout = getattr(proc, "stdout", "") or "" if not isinstance(raw_stdout, str): raw_stdout = raw_stdout.decode("utf-8", errors="replace") if raw_stdout else "" if raw_stdout: try: cli = json.loads(raw_stdout) except json.JSONDecodeError: cli = {"raw_stdout_tail": raw_stdout[-2000:]} rc = getattr(proc, "returncode", -1) if not timed_out else -2 # Inspect output doc_md = out_dir / "document.md" idx_json = out_dir / "_index.json" chunks_dir = out_dir / "chunks" images_dir = out_dir / "images" chunks_count = len(list(chunks_dir.glob("c*.md"))) if chunks_dir.exists() else 0 images_count = len(list(images_dir.glob("*.png"))) if images_dir.exists() else 0 has_doc = doc_md.exists() has_idx = idx_json.exists() success = has_doc and has_idx and chunks_count > 0 and not timed_out # Archive on success → raw/--subagent/ if success: if archive.exists(): shutil.rmtree(archive) shutil.move(str(out_dir), str(archive)) result_excerpt = (cli.get("result") or "")[:500] record = { "doc_id": doc_id, "page_count": page_count, "started_at": started_at, "finished_at": utc_iso(), "wall_seconds": wall, "returncode": rc, "timed_out": timed_out, "success": success, "has_document_md": has_doc, "has_index_json": has_idx, "chunks_count": chunks_count, "images_count": images_count, "total_cost_usd": cli.get("total_cost_usd"), "num_turns": cli.get("num_turns"), "is_error": cli.get("is_error"), "usage": cli.get("usage"), "result_excerpt": result_excerpt, } # Detect Anthropic quota errors and flip the global stop flag. if not success and looks_like_quota_error(result_excerpt, raw_stdout): record["quota_error"] = True quota_exhausted = True append_jsonl(PROGRESS_LOG, record) if not success: append_jsonl(FAILED_LOG, record) return record def already_processed_ids() -> set[str]: """Read progress.jsonl to see which doc_ids have a 'success' record.""" done: set[str] = set() if not PROGRESS_LOG.exists(): return done with PROGRESS_LOG.open("r", encoding="utf-8") as fh: for line in fh: try: r = json.loads(line) if r.get("success"): done.add(r["doc_id"]) except json.JSONDecodeError: continue return done def main(): ap = argparse.ArgumentParser() ap.add_argument("--workers", type=int, default=2) ap.add_argument("--limit", type=int, default=None) ap.add_argument("--doc-id", default=None, help="Single doc id") ap.add_argument("--force", action="store_true", help="Rebuild even if archive exists") ap.add_argument("--timeout-per-page", type=int, default=300, help="Seconds per page (default 300 = 5min)") ap.add_argument("--min-timeout", type=int, default=900, help="Minimum doc timeout") ap.add_argument("--max-timeout", type=int, default=14400, help="Maximum doc timeout (4h)") args = ap.parse_args() all_docs = discover_docs() if args.doc_id: all_docs = [(d, p) for d, p in all_docs if d == args.doc_id] if not all_docs: sys.stderr.write(f"✗ doc_id '{args.doc_id}' not found in {PNG_ROOT}\n") sys.exit(1) # Filter already-done unless --force already = already_processed_ids() if not args.force else set() queue: list[tuple[str, int]] = [] skipped_done = 0 for doc_id, pages in all_docs: if not args.force and (doc_id in already or is_done(doc_id)): skipped_done += 1 continue queue.append((doc_id, pages)) if args.limit: queue = queue[: args.limit] print(f"=" * 70) print(f" BATCH REBUILD — {len(queue)} docs queued, {skipped_done} already done") print(f" workers: {args.workers} · 1 doc per subprocess (clean context)") print(f" started: {utc_iso()}") print(f" progress log: {PROGRESS_LOG}") print(f"=" * 70) sys.stdout.flush() batch_t0 = time.time() completed = 0 successes = 0 failures = 0 total_cost = 0.0 total_chunks = 0 with ThreadPoolExecutor(max_workers=args.workers) as ex: futures = {} for doc_id, pages in queue: timeout = max(args.min_timeout, min(args.max_timeout, pages * args.timeout_per_page)) fut = ex.submit(rebuild_one, doc_id, pages, timeout) futures[fut] = (doc_id, pages, timeout) for fut in as_completed(futures): doc_id, pages, timeout = futures[fut] completed += 1 try: r = fut.result() except Exception as e: r = {"doc_id": doc_id, "success": False, "exception": str(e)} append_jsonl(FAILED_LOG, r) if r.get("skipped"): marker = "⊘" failures += 1 elif r.get("success"): successes += 1 total_cost += r.get("total_cost_usd") or 0.0 total_chunks += r.get("chunks_count") or 0 marker = "✓" else: failures += 1 marker = "✗" if r.get("quota_error"): marker = "💸" wall_doc = r.get("wall_seconds", 0) chunks = r.get("chunks_count", 0) cost = r.get("total_cost_usd") or 0.0 elapsed = round(time.time() - batch_t0, 0) print(f" [{completed}/{len(queue)}] {marker} {doc_id} · pages={pages} chunks={chunks} cost=${cost:.2f} wall={wall_doc}s · batch_elapsed={int(elapsed)}s") sys.stdout.flush() if quota_exhausted: print(f"\n ⚠ QUOTA EXHAUSTED — stopping batch. Re-run later (rolling 5h window).") for f in futures: if not f.done(): f.cancel() break summary = { "started_at": utc_iso(), "queue_size": len(queue), "completed": completed, "successes": successes, "failures": failures, "total_cost_usd": round(total_cost, 2), "total_chunks": total_chunks, "batch_wall_seconds": round(time.time() - batch_t0, 1), "workers": args.workers, } SUMMARY_LOG.write_text(json.dumps(summary, indent=2)) print(f"\n{'=' * 70}") print(f" DONE — {successes}/{completed} succeeded · ${total_cost:.2f} total · {total_chunks} chunks") print(f" summary: {SUMMARY_LOG}") print(f"{'=' * 70}") if __name__ == "__main__": main()