356 lines
12 KiB
Python
Executable file
356 lines
12 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
28-batch-rebuild-all.py — Batch rebuild ALL declassified UAP/UFO documents into
|
|
the agentic chunk schema (raw/<doc-id>/document.md + chunks/ + _index.json).
|
|
|
|
CRITICAL RULE (user-mandated): each document runs in its OWN fresh `claude -p`
|
|
subprocess with empty context. Never accumulate multiple docs in one session.
|
|
|
|
Workers parallel at the SUBPROCESS level only — never multi-doc inside one
|
|
Claude session. Default 2 parallel workers.
|
|
|
|
Usage:
|
|
./28-batch-rebuild-all.py # 2 workers, all unbuilt docs
|
|
./28-batch-rebuild-all.py --workers 4
|
|
./28-batch-rebuild-all.py --limit 5 # smoke
|
|
./28-batch-rebuild-all.py --doc-id doc-X # single doc
|
|
./28-batch-rebuild-all.py --resume # skip docs already done
|
|
./28-batch-rebuild-all.py --force # rebuild even if exists
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from threading import Lock
|
|
|
|
|
|
UFO_ROOT = Path("/Users/guto/ufo")
|
|
PNG_ROOT = UFO_ROOT / "processing" / "png"
|
|
RAW_ROOT = UFO_ROOT / "raw"
|
|
LOG_DIR = RAW_ROOT / "_batch-rebuild"
|
|
LOG_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
PROGRESS_LOG = LOG_DIR / "progress.jsonl"
|
|
FAILED_LOG = LOG_DIR / "failed.jsonl"
|
|
SUMMARY_LOG = LOG_DIR / "summary.json"
|
|
|
|
PROMPT = """Rebuild the document `{doc_id}` into `raw/{doc_id}/document.md` using the `doc-rebuilder` subagent.
|
|
|
|
Constraints:
|
|
- Process ALL {page_count} pages (p001 .. p{page_count:03d}).
|
|
- The doc-rebuilder agent spawns `page-rebuilder`, `image-analyst`, and `table-stitcher` subagents in parallel via Task.
|
|
- Output schema: as defined in .claude/agents/doc-rebuilder.md system prompt.
|
|
- Target files:
|
|
- `/Users/guto/ufo/raw/{doc_id}/document.md` (assembled master)
|
|
- `/Users/guto/ufo/raw/{doc_id}/_index.json` (machine-readable chunk index)
|
|
- `/Users/guto/ufo/raw/{doc_id}/chunks/c*.md` (one file per chunk)
|
|
- `/Users/guto/ufo/raw/{doc_id}/images/IMG-c*.png` (cropped image regions)
|
|
- Set frontmatter `build_approach: "subagents"`.
|
|
- Bilingual EN + Brazilian PT-BR. Preserve UTF-8 accents.
|
|
|
|
Begin. When done, output a single line of stats: pages_done, chunks_total, images_extracted, tables_stitched, ufo_anomalies, cryptid_anomalies, wall_seconds."""
|
|
|
|
|
|
progress_lock = Lock()
|
|
quota_exhausted = False # global flag: if Anthropic returns "monthly usage limit", stop the batch
|
|
|
|
|
|
def utc_iso() -> str:
|
|
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
|
|
|
|
def append_jsonl(path: Path, record: dict) -> None:
|
|
with progress_lock:
|
|
with path.open("a", encoding="utf-8") as fh:
|
|
fh.write(json.dumps(record, ensure_ascii=False) + "\n")
|
|
|
|
|
|
def discover_docs() -> list[tuple[str, int]]:
|
|
"""List all doc_ids with their page count from processing/png/."""
|
|
out: list[tuple[str, int]] = []
|
|
for doc_dir in sorted(PNG_ROOT.iterdir()):
|
|
if not doc_dir.is_dir():
|
|
continue
|
|
pages = sorted(doc_dir.glob("p-*.png"))
|
|
if pages:
|
|
out.append((doc_dir.name, len(pages)))
|
|
return out
|
|
|
|
|
|
def is_done(doc_id: str) -> bool:
|
|
"""A doc is done if raw/<doc-id>--subagent/{document.md,_index.json,chunks/} all exist."""
|
|
archive = RAW_ROOT / f"{doc_id}--subagent"
|
|
return (archive / "document.md").exists() and (archive / "_index.json").exists() and (archive / "chunks").is_dir()
|
|
|
|
|
|
QUOTA_MARKERS = (
|
|
"monthly usage limit",
|
|
"usage limit",
|
|
"rate limit exceeded",
|
|
)
|
|
|
|
|
|
def looks_like_quota_error(result_excerpt: str, raw_stdout: str) -> bool:
|
|
blob = (result_excerpt or "") + " " + (raw_stdout or "")
|
|
blob = blob.lower()
|
|
return any(m in blob for m in QUOTA_MARKERS)
|
|
|
|
|
|
def rebuild_one(doc_id: str, page_count: int, timeout_s: int) -> dict:
|
|
"""Run ONE `claude -p` subprocess for ONE document. Fresh context."""
|
|
global quota_exhausted
|
|
if quota_exhausted:
|
|
return {
|
|
"doc_id": doc_id,
|
|
"page_count": page_count,
|
|
"started_at": utc_iso(),
|
|
"finished_at": utc_iso(),
|
|
"wall_seconds": 0,
|
|
"returncode": -3,
|
|
"timed_out": False,
|
|
"success": False,
|
|
"skipped": True,
|
|
"skip_reason": "quota_exhausted_already_detected",
|
|
"chunks_count": 0,
|
|
"images_count": 0,
|
|
}
|
|
out_dir = RAW_ROOT / doc_id
|
|
archive = RAW_ROOT / f"{doc_id}--subagent"
|
|
|
|
# Wipe any half-built state, start clean
|
|
if out_dir.exists():
|
|
shutil.rmtree(out_dir)
|
|
if archive.exists():
|
|
shutil.rmtree(archive)
|
|
out_dir.mkdir(parents=True, exist_ok=True)
|
|
(out_dir / "chunks").mkdir(exist_ok=True)
|
|
(out_dir / "images").mkdir(exist_ok=True)
|
|
(out_dir / "tables").mkdir(exist_ok=True)
|
|
|
|
prompt = PROMPT.format(doc_id=doc_id, page_count=page_count)
|
|
cmd = [
|
|
"claude", "-p",
|
|
"--model", "sonnet",
|
|
"--output-format", "json",
|
|
"--max-turns", "120",
|
|
"--allowedTools", "Read,Write,Bash,Task",
|
|
"--add-dir", str(UFO_ROOT),
|
|
"--",
|
|
prompt,
|
|
]
|
|
|
|
t0 = time.time()
|
|
started_at = utc_iso()
|
|
try:
|
|
proc = subprocess.run(
|
|
cmd,
|
|
capture_output=True,
|
|
text=True,
|
|
env={**os.environ},
|
|
check=False,
|
|
timeout=timeout_s,
|
|
)
|
|
timed_out = False
|
|
except subprocess.TimeoutExpired as e:
|
|
proc = e
|
|
timed_out = True
|
|
wall = round(time.time() - t0, 1)
|
|
|
|
# Parse Claude CLI JSON output
|
|
cli: dict = {}
|
|
raw_stdout = getattr(proc, "stdout", "") or ""
|
|
if not isinstance(raw_stdout, str):
|
|
raw_stdout = raw_stdout.decode("utf-8", errors="replace") if raw_stdout else ""
|
|
if raw_stdout:
|
|
try:
|
|
cli = json.loads(raw_stdout)
|
|
except json.JSONDecodeError:
|
|
cli = {"raw_stdout_tail": raw_stdout[-2000:]}
|
|
|
|
rc = getattr(proc, "returncode", -1) if not timed_out else -2
|
|
|
|
# Inspect output
|
|
doc_md = out_dir / "document.md"
|
|
idx_json = out_dir / "_index.json"
|
|
chunks_dir = out_dir / "chunks"
|
|
images_dir = out_dir / "images"
|
|
|
|
chunks_count = len(list(chunks_dir.glob("c*.md"))) if chunks_dir.exists() else 0
|
|
images_count = len(list(images_dir.glob("*.png"))) if images_dir.exists() else 0
|
|
has_doc = doc_md.exists()
|
|
has_idx = idx_json.exists()
|
|
|
|
success = has_doc and has_idx and chunks_count > 0 and not timed_out
|
|
|
|
# Archive on success → raw/<doc-id>--subagent/
|
|
if success:
|
|
if archive.exists():
|
|
shutil.rmtree(archive)
|
|
shutil.move(str(out_dir), str(archive))
|
|
|
|
result_excerpt = (cli.get("result") or "")[:500]
|
|
record = {
|
|
"doc_id": doc_id,
|
|
"page_count": page_count,
|
|
"started_at": started_at,
|
|
"finished_at": utc_iso(),
|
|
"wall_seconds": wall,
|
|
"returncode": rc,
|
|
"timed_out": timed_out,
|
|
"success": success,
|
|
"has_document_md": has_doc,
|
|
"has_index_json": has_idx,
|
|
"chunks_count": chunks_count,
|
|
"images_count": images_count,
|
|
"total_cost_usd": cli.get("total_cost_usd"),
|
|
"num_turns": cli.get("num_turns"),
|
|
"is_error": cli.get("is_error"),
|
|
"usage": cli.get("usage"),
|
|
"result_excerpt": result_excerpt,
|
|
}
|
|
|
|
# Detect Anthropic quota errors and flip the global stop flag.
|
|
if not success and looks_like_quota_error(result_excerpt, raw_stdout):
|
|
record["quota_error"] = True
|
|
quota_exhausted = True
|
|
|
|
append_jsonl(PROGRESS_LOG, record)
|
|
if not success:
|
|
append_jsonl(FAILED_LOG, record)
|
|
|
|
return record
|
|
|
|
|
|
def already_processed_ids() -> set[str]:
|
|
"""Read progress.jsonl to see which doc_ids have a 'success' record."""
|
|
done: set[str] = set()
|
|
if not PROGRESS_LOG.exists():
|
|
return done
|
|
with PROGRESS_LOG.open("r", encoding="utf-8") as fh:
|
|
for line in fh:
|
|
try:
|
|
r = json.loads(line)
|
|
if r.get("success"):
|
|
done.add(r["doc_id"])
|
|
except json.JSONDecodeError:
|
|
continue
|
|
return done
|
|
|
|
|
|
def main():
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("--workers", type=int, default=2)
|
|
ap.add_argument("--limit", type=int, default=None)
|
|
ap.add_argument("--doc-id", default=None, help="Single doc id")
|
|
ap.add_argument("--force", action="store_true", help="Rebuild even if archive exists")
|
|
ap.add_argument("--timeout-per-page", type=int, default=300, help="Seconds per page (default 300 = 5min)")
|
|
ap.add_argument("--min-timeout", type=int, default=900, help="Minimum doc timeout")
|
|
ap.add_argument("--max-timeout", type=int, default=14400, help="Maximum doc timeout (4h)")
|
|
args = ap.parse_args()
|
|
|
|
all_docs = discover_docs()
|
|
if args.doc_id:
|
|
all_docs = [(d, p) for d, p in all_docs if d == args.doc_id]
|
|
if not all_docs:
|
|
sys.stderr.write(f"✗ doc_id '{args.doc_id}' not found in {PNG_ROOT}\n")
|
|
sys.exit(1)
|
|
|
|
# Filter already-done unless --force
|
|
already = already_processed_ids() if not args.force else set()
|
|
queue: list[tuple[str, int]] = []
|
|
skipped_done = 0
|
|
for doc_id, pages in all_docs:
|
|
if not args.force and (doc_id in already or is_done(doc_id)):
|
|
skipped_done += 1
|
|
continue
|
|
queue.append((doc_id, pages))
|
|
|
|
if args.limit:
|
|
queue = queue[: args.limit]
|
|
|
|
print(f"=" * 70)
|
|
print(f" BATCH REBUILD — {len(queue)} docs queued, {skipped_done} already done")
|
|
print(f" workers: {args.workers} · 1 doc per subprocess (clean context)")
|
|
print(f" started: {utc_iso()}")
|
|
print(f" progress log: {PROGRESS_LOG}")
|
|
print(f"=" * 70)
|
|
sys.stdout.flush()
|
|
|
|
batch_t0 = time.time()
|
|
completed = 0
|
|
successes = 0
|
|
failures = 0
|
|
total_cost = 0.0
|
|
total_chunks = 0
|
|
|
|
with ThreadPoolExecutor(max_workers=args.workers) as ex:
|
|
futures = {}
|
|
for doc_id, pages in queue:
|
|
timeout = max(args.min_timeout, min(args.max_timeout, pages * args.timeout_per_page))
|
|
fut = ex.submit(rebuild_one, doc_id, pages, timeout)
|
|
futures[fut] = (doc_id, pages, timeout)
|
|
|
|
for fut in as_completed(futures):
|
|
doc_id, pages, timeout = futures[fut]
|
|
completed += 1
|
|
try:
|
|
r = fut.result()
|
|
except Exception as e:
|
|
r = {"doc_id": doc_id, "success": False, "exception": str(e)}
|
|
append_jsonl(FAILED_LOG, r)
|
|
|
|
if r.get("skipped"):
|
|
marker = "⊘"
|
|
failures += 1
|
|
elif r.get("success"):
|
|
successes += 1
|
|
total_cost += r.get("total_cost_usd") or 0.0
|
|
total_chunks += r.get("chunks_count") or 0
|
|
marker = "✓"
|
|
else:
|
|
failures += 1
|
|
marker = "✗"
|
|
if r.get("quota_error"):
|
|
marker = "💸"
|
|
wall_doc = r.get("wall_seconds", 0)
|
|
chunks = r.get("chunks_count", 0)
|
|
cost = r.get("total_cost_usd") or 0.0
|
|
elapsed = round(time.time() - batch_t0, 0)
|
|
print(f" [{completed}/{len(queue)}] {marker} {doc_id} · pages={pages} chunks={chunks} cost=${cost:.2f} wall={wall_doc}s · batch_elapsed={int(elapsed)}s")
|
|
sys.stdout.flush()
|
|
|
|
if quota_exhausted:
|
|
print(f"\n ⚠ QUOTA EXHAUSTED — stopping batch. Re-run later (rolling 5h window).")
|
|
for f in futures:
|
|
if not f.done():
|
|
f.cancel()
|
|
break
|
|
|
|
summary = {
|
|
"started_at": utc_iso(),
|
|
"queue_size": len(queue),
|
|
"completed": completed,
|
|
"successes": successes,
|
|
"failures": failures,
|
|
"total_cost_usd": round(total_cost, 2),
|
|
"total_chunks": total_chunks,
|
|
"batch_wall_seconds": round(time.time() - batch_t0, 1),
|
|
"workers": args.workers,
|
|
}
|
|
SUMMARY_LOG.write_text(json.dumps(summary, indent=2))
|
|
print(f"\n{'=' * 70}")
|
|
print(f" DONE — {successes}/{completed} succeeded · ${total_cost:.2f} total · {total_chunks} chunks")
|
|
print(f" summary: {SUMMARY_LOG}")
|
|
print(f"{'=' * 70}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|