#!/usr/bin/env python3 """ run.py — Re-extract a document via Claude Code OAuth (Sonnet), with chunked processing for large docs. Strategy: - Build doc text from already-extracted chunks (build_doc_text.py). - If text fits in one Sonnet window (default 50k tokens input budget), run a single call producing the full JSON. - Otherwise, split the doc into overlapping segments of ~50k input tokens each, run Sonnet on each segment (preserving chunk_id markers), then MERGE the JSONs deduping by (name, class) within each entity list and by (source_name, type, target_name, evidence_chunks) for relations. The merged JSON faithfully covers the entire document — no entity is dropped because the doc was "too big". """ from __future__ import annotations import json import os import re import subprocess import sys from pathlib import Path import yaml REX_DIR = Path("/Users/guto/ufo/scripts/reextract") RAW = Path("/Users/guto/ufo/raw") BUILD_DOC = REX_DIR / "build_doc_text.py" PROMPT_SYSTEM = REX_DIR / "prompt-system.md" ENUMS_YAML = REX_DIR / "enums.yaml" VALIDATE = REX_DIR / "validate.py" # Token budget per call. # Both Sonnet and Opus cap output at 32k tokens. We partition the extraction # into 5 separate calls per segment, each producing a small piece of the JSON. # Each piece is well under the ceiling. SEGMENT_INPUT_CHARS = 60_000 # ~15k tokens input per segment SEGMENT_OVERLAP_CHARS = 3_000 # Per-segment extraction is split into 5 passes. Each pass gets the same # document text (so the claude CLI reuses its prompt cache) but a different # "output mode" instruction asking for ONE category only. PASSES = [ ("events", "Return a JSON object with ONLY the events array, exhaustively extracted " "from THIS segment:\n\n" "{\"events\": [{...event objects per the schema...}]}\n\n" "Use the event schema rules from the system prompt. Include " "uap_objects_observed, observers (with role_at_event), " "primary_location_name/geo_class, evidence_chunks, both narratives. " "Do NOT include people/organizations/locations/relations/doc-level fields."), ("people", "Return a JSON object with ONLY the people array, exhaustively extracted " "from THIS segment:\n\n" "{\"people\": [{...person objects per the schema...}]}\n\n" "Use the person schema rules. Each entry: name, aliases_in_doc, " "person_class, affiliation, role_at_doc_date, evidence_chunks, confidence. " "Skip pure routing-list entries (FBI distribution slips, etc.) unless they " "are subjects/witnesses/authors of real content."), ("orgs_locs", "Return a JSON object with ONLY organizations[] and locations[], exhaustively " "extracted from THIS segment:\n\n" "{\"organizations\": [...], \"locations\": [...]}\n\n" "Use the schema rules. Include aliases_in_doc, class enums, country, " "region_or_state (locations), evidence_chunks."), ("relations", "Return a JSON object with ONLY the relations array, exhaustively extracted " "from THIS segment:\n\n" "{\"relations\": [{...relation objects...}]}\n\n" "Priority: typed investigative relations (witnessed, occurred_at, signed, " "involves_uap, investigated, authored, employed_by, commanded). " "mentioned_by ONLY for clearly investigative people (authors of memos, " "subjects of investigations); skip mentioned_by for pure routing-list " "names (Tolson, Ladd routing slips). All other types as found."), ("doc_meta", "Return a JSON object with ONLY the document-level fields:\n\n" "{\"doc_classification\": \"...\", \"doc_classification_note\": null|\"...\", " "\"doc_period\": \"YYYY\" or \"YYYY-YYYY\", " "\"primary_topics\": [\"...\"], " "\"noise_emission\": \"...\", \"investigative_value\": \"...\", " "\"doc_summary_en\": \"2-3 sentences\", \"doc_summary_pt_br\": \"2-3 frases\"}\n\n" "These reflect the FULL document this segment is part of (you may not see " "every page in this segment, but classify based on what you do see)."), ] def build_doc_text(doc_id: str) -> str: r = subprocess.run( ["python3", str(BUILD_DOC), doc_id], capture_output=True, text=True, encoding="utf-8", ) if r.returncode != 0: sys.exit(f"build_doc_text failed: {r.stderr}") return r.stdout def segment_doc(text: str) -> list[str]: """Split doc text into overlapping segments at chunk-marker boundaries. A segment never breaks a chunk — we split at the `[chunk c... · ...]` line boundaries closest to the char-budget cap. """ if len(text) <= SEGMENT_INPUT_CHARS: return [text] # Find all chunk marker line positions (line starts). marker_re = re.compile(r"^\[chunk c\d+ ·.*$", re.MULTILINE) starts = [m.start() for m in marker_re.finditer(text)] if not starts: return [text] segments: list[str] = [] seg_start = 0 while seg_start < len(text): cap = seg_start + SEGMENT_INPUT_CHARS if cap >= len(text): segments.append(text[seg_start:]) break # Pick the LAST chunk marker before cap (so we never break a chunk) candidates = [s for s in starts if seg_start < s < cap] if not candidates: # No chunk marker fits — cut at cap (shouldn't happen with normal data) seg_end = cap else: seg_end = candidates[-1] segments.append(text[seg_start:seg_end]) # Next segment starts at the chunk marker that gives ~OVERLAP_CHARS overlap target_overlap_start = seg_end - SEGMENT_OVERLAP_CHARS overlap_candidates = [s for s in starts if target_overlap_start <= s < seg_end] if overlap_candidates: seg_start = overlap_candidates[0] else: seg_start = seg_end return segments def build_full_prompt(doc_id: str, doc_text: str, segment_meta: str = "", pass_instruction: str = "", error_feedback: str = "") -> str: system = PROMPT_SYSTEM.read_text(encoding="utf-8") enums = ENUMS_YAML.read_text(encoding="utf-8") feedback_block = "" if error_feedback: feedback_block = ( "\n\n## PREVIOUS ATTEMPT FAILED VALIDATION\n\n" "Your previous JSON had these errors. Fix them and return a corrected JSON:\n" f"```\n{error_feedback}\n```\n\n" "Re-emit the FULL corrected JSON.\n" ) pass_block = "" if pass_instruction: pass_block = ( "\n\n## OUTPUT MODE (THIS CALL ONLY)\n\n" f"{pass_instruction}\n\n" "Return ONLY the JSON object described above. No markdown fence, " "no preamble, no postscript. JSON only.\n" ) return ( f"{system}\n\n" "## CLOSED ENUMS (use ONLY these values)\n\n" f"```yaml\n{enums}\n```\n\n" f"## DOCUMENT TO ANALYZE — doc_id: {doc_id}\n" f"{segment_meta}\n\n" f"```\n{doc_text}\n```\n" f"{pass_block}" f"{feedback_block}\n" "Return the JSON now." ) def call_claude(prompt: str) -> tuple[str, dict]: """Returns (response_text, meta_dict). Uses `--output-format text` + `--disallowed-tools` and redirects stdout DIRECTLY to a temp file (not via subprocess.PIPE, which silently truncates large outputs). Tools are disabled to prevent the model from initiating multi-turn calls that fail in `-p` mode. """ import tempfile env = {**os.environ, "CLAUDE_CODE_MAX_OUTPUT_TOKENS": "32000"} DISALLOWED = ( "AskUserQuestion,Bash,Edit,Write,Read,Task,Glob,Grep," "TaskCreate,TaskUpdate,TaskList,TaskGet,TaskStop,TaskOutput," "Skill,ScheduleWakeup,Monitor,WebSearch,WebFetch,NotebookEdit," "EnterPlanMode,ExitPlanMode,EnterWorktree,ExitWorktree," "CronCreate,CronDelete,CronList,RemoteTrigger,ToolSearch," "PushNotification,ListMcpResourcesTool,ReadMcpResourceTool," "ShareOnboardingGuide" ) with tempfile.NamedTemporaryFile(mode="w+", suffix=".txt", delete=False, encoding="utf-8") as tmp: tmp_path = tmp.name try: with open(tmp_path, "wb") as out_f: r = subprocess.run( ["claude", "-p", "--model", "opus", "--output-format", "text", "--disallowed-tools", DISALLOWED], input=prompt.encode("utf-8"), stdout=out_f, stderr=subprocess.PIPE, env=env, ) if r.returncode != 0: sys.exit( f"claude CLI failed (rc={r.returncode})\n" f"stderr: {r.stderr.decode('utf-8', errors='replace')[:4000]}" ) with open(tmp_path, "r", encoding="utf-8") as f: output = f.read() return (output, {"stop_reason": "text-format-no-meta", "chars": len(output)}) finally: try: os.unlink(tmp_path) except OSError: pass def extract_json_block(s: str) -> str: s = s.strip() if s.startswith("```"): s = "\n".join(line for line in s.splitlines() if not line.startswith("```")) s = s.strip() start = s.find("{") end = s.rfind("}") if start >= 0 and end > start: return s[start:end + 1] return s def merge_extractions(segments: list[dict], doc_id: str) -> dict: """Merge per-segment JSONs into a single doc-level JSON. Dedup rules: - events: by (label, date_start) - people: by canonical name (lowercase, role prefixes stripped) - orgs: by canonical name (lowercase) - locations: by canonical name (lowercase) - relations: by (source_name, type, target_name) — union of evidence_chunks Top-level fields (classification, summary) come from the FIRST segment with non-null values; primary_topics is union; noise_emission/investigative_value is the MAX across segments (worst case takes precedence). """ out: dict = { "doc_id": doc_id, "doc_classification": None, "doc_classification_note": None, "doc_period": None, "primary_topics": [], "noise_emission": None, "investigative_value": None, "doc_summary_en": None, "doc_summary_pt_br": None, "events": [], "people": [], "organizations": [], "locations": [], "relations": [], } def lower_key(s) -> str: return (s or "").strip().lower() # Top-level pickup from first non-null for seg in segments: for k in ("doc_classification", "doc_classification_note", "doc_period", "doc_summary_en", "doc_summary_pt_br"): if not out.get(k) and seg.get(k): out[k] = seg[k] # primary_topics: union, preserve first-seen order seen = set() for seg in segments: for t in seg.get("primary_topics") or []: if t and t not in seen: seen.add(t); out["primary_topics"].append(t) # noise / investigative — take WORST across segments NOISE_ORDER = {"none": 0, "low": 1, "medium": 2, "high": 3} INV_ORDER = {"none": 0, "low": 1, "medium": 2, "high": 3, "critical": 4} noise_max, inv_max = None, None for seg in segments: n = seg.get("noise_emission") if n in NOISE_ORDER and (noise_max is None or NOISE_ORDER[n] > NOISE_ORDER[noise_max]): noise_max = n i = seg.get("investigative_value") if i in INV_ORDER and (inv_max is None or INV_ORDER[i] > INV_ORDER[inv_max]): inv_max = i out["noise_emission"] = noise_max out["investigative_value"] = inv_max # Entities — dedup by canonical key def merge_list(key: str, key_fn): seen_keys: dict = {} for seg in segments: for item in seg.get(key) or []: if not isinstance(item, dict): continue k = key_fn(item) if not k: continue if k in seen_keys: # merge: union evidence_chunks, aliases_in_doc; keep highest confidence existing = seen_keys[k] ev = set(existing.get("evidence_chunks") or []) | set(item.get("evidence_chunks") or []) existing["evidence_chunks"] = sorted(ev) al = set(existing.get("aliases_in_doc") or []) | set(item.get("aliases_in_doc") or []) al.discard(existing.get("name", "")) existing["aliases_in_doc"] = sorted(al) # confidence: keep max conf_order = {"low": 0, "medium": 1, "high": 2} if conf_order.get(item.get("confidence"), 0) > conf_order.get(existing.get("confidence"), 0): existing["confidence"] = item["confidence"] else: seen_keys[k] = item return list(seen_keys.values()) out["events"] = merge_list("events", lambda e: (lower_key(e.get("label")), e.get("date_start") or "")) out["people"] = merge_list("people", lambda p: lower_key(p.get("name"))) out["organizations"] = merge_list("organizations", lambda o: lower_key(o.get("name"))) out["locations"] = merge_list("locations", lambda l: lower_key(l.get("name"))) # Relations: dedup by (source_class, source_name_lower, type, target_class, target_name_lower) rel_seen: dict = {} for seg in segments: for r in seg.get("relations") or []: if not isinstance(r, dict): continue key = ( r.get("source_class"), lower_key(r.get("source_name")), r.get("type"), r.get("target_class"), lower_key(r.get("target_name")), ) if key in rel_seen: existing = rel_seen[key] ev = set(existing.get("evidence_chunks") or []) | set(r.get("evidence_chunks") or []) existing["evidence_chunks"] = sorted(ev) else: rel_seen[key] = r out["relations"] = list(rel_seen.values()) return out def main() -> int: if len(sys.argv) < 2: sys.exit("usage: run.py ") doc_id = sys.argv[1] out_path = RAW / f"{doc_id}--subagent" / "_reextract.json" print(f"[1/N] Building doc text ...") doc_text = build_doc_text(doc_id) print(f" {len(doc_text)} chars (~{len(doc_text) // 4} tokens)") segments = segment_doc(doc_text) n_seg = len(segments) print(f"[2/N] Splitting into {n_seg} segment(s) of ~{SEGMENT_INPUT_CHARS // 1000}k chars each") for i, s in enumerate(segments, 1): print(f" segment {i}: {len(s)} chars") from concurrent.futures import ThreadPoolExecutor, as_completed def run_pass(seg_idx: int, seg_text: str, pass_name: str, pass_instr: str) -> tuple[int, str, dict | None, str]: meta_label = f"\n\n[SEGMENT {seg_idx + 1} OF {n_seg}] — extract everything in THIS segment exhaustively.\n" prompt = build_full_prompt(doc_id, seg_text, segment_meta=meta_label, pass_instruction=pass_instr) raw, _meta = call_claude(prompt) json_text = extract_json_block(raw) try: piece = json.loads(json_text) return (seg_idx, pass_name, piece, "") except json.JSONDecodeError as e: return (seg_idx, pass_name, None, f"{e} | raw_len={len(raw)}") # Fire ALL (segment, pass) jobs in parallel extracted: list[dict] = [{"doc_id": doc_id} for _ in range(n_seg)] n_jobs = n_seg * len(PASSES) print(f"[3/N] Firing {n_jobs} parallel passes ({n_seg} segments × {len(PASSES)} passes)") errors: list[str] = [] completed = 0 # Cap inner concurrency so outer-WORKERS × inner doesn't fork-bomb the box. # 5 = one slot per pass; segments process serially within a doc. INNER_MAX = int(os.environ.get("REEXTRACT_INNER_MAX", "5")) with ThreadPoolExecutor(max_workers=min(INNER_MAX, n_jobs)) as pool: futures = [] for seg_idx, seg in enumerate(segments): for pass_name, pass_instr in PASSES: futures.append(pool.submit(run_pass, seg_idx, seg, pass_name, pass_instr)) for fut in as_completed(futures): seg_idx, pass_name, piece, err = fut.result() completed += 1 tag = f"seg{seg_idx+1}/{pass_name}" if err: errors.append(f"{tag}: {err}") debug = out_path.parent / f"_reextract_raw_seg{seg_idx+1}_{pass_name}.txt" print(f" [{completed}/{n_jobs}] {tag} FAILED — {err[:120]}") else: if isinstance(piece, dict): for k, v in piece.items(): extracted[seg_idx][k] = v print(f" [{completed}/{n_jobs}] {tag} OK") print(f"[4/N] Merging {n_seg} extraction(s) ...") merged = merge_extractions(extracted, doc_id) if n_seg > 1 else {**extracted[0], "doc_id": doc_id} out_path.parent.mkdir(parents=True, exist_ok=True) out_path.write_text(json.dumps(merged, indent=2, ensure_ascii=False), encoding="utf-8") print(f" saved {out_path}") print(f"[5/N] Validating ...") v = subprocess.run( ["python3", str(VALIDATE), doc_id, str(out_path)], capture_output=True, text=True, encoding="utf-8", ) print(v.stdout.strip()) return v.returncode if __name__ == "__main__": sys.exit(main())