disclosure-bureau/scripts/reextract/run.py

418 lines
17 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""
run.py Re-extract a document via Claude Code OAuth (Sonnet), with chunked
processing for large docs.
Strategy:
- Build doc text from already-extracted chunks (build_doc_text.py).
- If text fits in one Sonnet window (default 50k tokens input budget),
run a single call producing the full JSON.
- Otherwise, split the doc into overlapping segments of ~50k input tokens
each, run Sonnet on each segment (preserving chunk_id markers), then
MERGE the JSONs deduping by (name, class) within each entity list and
by (source_name, type, target_name, evidence_chunks) for relations.
The merged JSON faithfully covers the entire document no entity is dropped
because the doc was "too big".
"""
from __future__ import annotations
import json
import os
import re
import subprocess
import sys
from pathlib import Path
import yaml
REX_DIR = Path("/Users/guto/ufo/scripts/reextract")
RAW = Path("/Users/guto/ufo/raw")
BUILD_DOC = REX_DIR / "build_doc_text.py"
PROMPT_SYSTEM = REX_DIR / "prompt-system.md"
ENUMS_YAML = REX_DIR / "enums.yaml"
VALIDATE = REX_DIR / "validate.py"
# Token budget per call.
# Both Sonnet and Opus cap output at 32k tokens. We partition the extraction
# into 5 separate calls per segment, each producing a small piece of the JSON.
# Each piece is well under the ceiling.
SEGMENT_INPUT_CHARS = 60_000 # ~15k tokens input per segment
SEGMENT_OVERLAP_CHARS = 3_000
# Per-segment extraction is split into 5 passes. Each pass gets the same
# document text (so the claude CLI reuses its prompt cache) but a different
# "output mode" instruction asking for ONE category only.
PASSES = [
("events",
"Return a JSON object with ONLY the events array, exhaustively extracted "
"from THIS segment:\n\n"
"{\"events\": [{...event objects per the schema...}]}\n\n"
"Use the event schema rules from the system prompt. Include "
"uap_objects_observed, observers (with role_at_event), "
"primary_location_name/geo_class, evidence_chunks, both narratives. "
"Do NOT include people/organizations/locations/relations/doc-level fields."),
("people",
"Return a JSON object with ONLY the people array, exhaustively extracted "
"from THIS segment:\n\n"
"{\"people\": [{...person objects per the schema...}]}\n\n"
"Use the person schema rules. Each entry: name, aliases_in_doc, "
"person_class, affiliation, role_at_doc_date, evidence_chunks, confidence. "
"Skip pure routing-list entries (FBI distribution slips, etc.) unless they "
"are subjects/witnesses/authors of real content."),
("orgs_locs",
"Return a JSON object with ONLY organizations[] and locations[], exhaustively "
"extracted from THIS segment:\n\n"
"{\"organizations\": [...], \"locations\": [...]}\n\n"
"Use the schema rules. Include aliases_in_doc, class enums, country, "
"region_or_state (locations), evidence_chunks."),
("relations",
"Return a JSON object with ONLY the relations array, exhaustively extracted "
"from THIS segment:\n\n"
"{\"relations\": [{...relation objects...}]}\n\n"
"Priority: typed investigative relations (witnessed, occurred_at, signed, "
"involves_uap, investigated, authored, employed_by, commanded). "
"mentioned_by ONLY for clearly investigative people (authors of memos, "
"subjects of investigations); skip mentioned_by for pure routing-list "
"names (Tolson, Ladd routing slips). All other types as found."),
("doc_meta",
"Return a JSON object with ONLY the document-level fields:\n\n"
"{\"doc_classification\": \"...\", \"doc_classification_note\": null|\"...\", "
"\"doc_period\": \"YYYY\" or \"YYYY-YYYY\", "
"\"primary_topics\": [\"...\"], "
"\"noise_emission\": \"...\", \"investigative_value\": \"...\", "
"\"doc_summary_en\": \"2-3 sentences\", \"doc_summary_pt_br\": \"2-3 frases\"}\n\n"
"These reflect the FULL document this segment is part of (you may not see "
"every page in this segment, but classify based on what you do see)."),
]
def build_doc_text(doc_id: str) -> str:
r = subprocess.run(
["python3", str(BUILD_DOC), doc_id],
capture_output=True, text=True, encoding="utf-8",
)
if r.returncode != 0:
sys.exit(f"build_doc_text failed: {r.stderr}")
return r.stdout
def segment_doc(text: str) -> list[str]:
"""Split doc text into overlapping segments at chunk-marker boundaries.
A segment never breaks a chunk we split at the `[chunk c... · ...]`
line boundaries closest to the char-budget cap.
"""
if len(text) <= SEGMENT_INPUT_CHARS:
return [text]
# Find all chunk marker line positions (line starts).
marker_re = re.compile(r"^\[chunk c\d+ ·.*$", re.MULTILINE)
starts = [m.start() for m in marker_re.finditer(text)]
if not starts: return [text]
segments: list[str] = []
seg_start = 0
while seg_start < len(text):
cap = seg_start + SEGMENT_INPUT_CHARS
if cap >= len(text):
segments.append(text[seg_start:])
break
# Pick the LAST chunk marker before cap (so we never break a chunk)
candidates = [s for s in starts if seg_start < s < cap]
if not candidates:
# No chunk marker fits — cut at cap (shouldn't happen with normal data)
seg_end = cap
else:
seg_end = candidates[-1]
segments.append(text[seg_start:seg_end])
# Next segment starts at the chunk marker that gives ~OVERLAP_CHARS overlap
target_overlap_start = seg_end - SEGMENT_OVERLAP_CHARS
overlap_candidates = [s for s in starts if target_overlap_start <= s < seg_end]
if overlap_candidates:
seg_start = overlap_candidates[0]
else:
seg_start = seg_end
return segments
def build_full_prompt(doc_id: str, doc_text: str, segment_meta: str = "",
pass_instruction: str = "",
error_feedback: str = "") -> str:
system = PROMPT_SYSTEM.read_text(encoding="utf-8")
enums = ENUMS_YAML.read_text(encoding="utf-8")
feedback_block = ""
if error_feedback:
feedback_block = (
"\n\n## PREVIOUS ATTEMPT FAILED VALIDATION\n\n"
"Your previous JSON had these errors. Fix them and return a corrected JSON:\n"
f"```\n{error_feedback}\n```\n\n"
"Re-emit the FULL corrected JSON.\n"
)
pass_block = ""
if pass_instruction:
pass_block = (
"\n\n## OUTPUT MODE (THIS CALL ONLY)\n\n"
f"{pass_instruction}\n\n"
"Return ONLY the JSON object described above. No markdown fence, "
"no preamble, no postscript. JSON only.\n"
)
return (
f"{system}\n\n"
"## CLOSED ENUMS (use ONLY these values)\n\n"
f"```yaml\n{enums}\n```\n\n"
f"## DOCUMENT TO ANALYZE — doc_id: {doc_id}\n"
f"{segment_meta}\n\n"
f"```\n{doc_text}\n```\n"
f"{pass_block}"
f"{feedback_block}\n"
"Return the JSON now."
)
def call_claude(prompt: str) -> tuple[str, dict]:
"""Returns (response_text, meta_dict).
Uses `--output-format text` + `--disallowed-tools` and redirects stdout
DIRECTLY to a temp file (not via subprocess.PIPE, which silently truncates
large outputs). Tools are disabled to prevent the model from initiating
multi-turn calls that fail in `-p` mode.
"""
import tempfile
env = {**os.environ, "CLAUDE_CODE_MAX_OUTPUT_TOKENS": "32000"}
DISALLOWED = (
"AskUserQuestion,Bash,Edit,Write,Read,Task,Glob,Grep,"
"TaskCreate,TaskUpdate,TaskList,TaskGet,TaskStop,TaskOutput,"
"Skill,ScheduleWakeup,Monitor,WebSearch,WebFetch,NotebookEdit,"
"EnterPlanMode,ExitPlanMode,EnterWorktree,ExitWorktree,"
"CronCreate,CronDelete,CronList,RemoteTrigger,ToolSearch,"
"PushNotification,ListMcpResourcesTool,ReadMcpResourceTool,"
"ShareOnboardingGuide"
)
with tempfile.NamedTemporaryFile(mode="w+", suffix=".txt", delete=False, encoding="utf-8") as tmp:
tmp_path = tmp.name
try:
with open(tmp_path, "wb") as out_f:
r = subprocess.run(
["claude", "-p", "--model", "opus", "--output-format", "text",
"--disallowed-tools", DISALLOWED],
input=prompt.encode("utf-8"),
stdout=out_f,
stderr=subprocess.PIPE,
env=env,
)
if r.returncode != 0:
sys.exit(
f"claude CLI failed (rc={r.returncode})\n"
f"stderr: {r.stderr.decode('utf-8', errors='replace')[:4000]}"
)
with open(tmp_path, "r", encoding="utf-8") as f:
output = f.read()
return (output, {"stop_reason": "text-format-no-meta", "chars": len(output)})
finally:
try: os.unlink(tmp_path)
except OSError: pass
def extract_json_block(s: str) -> str:
s = s.strip()
if s.startswith("```"):
s = "\n".join(line for line in s.splitlines() if not line.startswith("```"))
s = s.strip()
start = s.find("{")
end = s.rfind("}")
if start >= 0 and end > start:
return s[start:end + 1]
return s
def merge_extractions(segments: list[dict], doc_id: str) -> dict:
"""Merge per-segment JSONs into a single doc-level JSON.
Dedup rules:
- events: by (label, date_start)
- people: by canonical name (lowercase, role prefixes stripped)
- orgs: by canonical name (lowercase)
- locations: by canonical name (lowercase)
- relations: by (source_name, type, target_name) union of evidence_chunks
Top-level fields (classification, summary) come from the FIRST segment with
non-null values; primary_topics is union; noise_emission/investigative_value
is the MAX across segments (worst case takes precedence).
"""
out: dict = {
"doc_id": doc_id,
"doc_classification": None,
"doc_classification_note": None,
"doc_period": None,
"primary_topics": [],
"noise_emission": None,
"investigative_value": None,
"doc_summary_en": None,
"doc_summary_pt_br": None,
"events": [],
"people": [],
"organizations": [],
"locations": [],
"relations": [],
}
def lower_key(s) -> str:
return (s or "").strip().lower()
# Top-level pickup from first non-null
for seg in segments:
for k in ("doc_classification", "doc_classification_note", "doc_period",
"doc_summary_en", "doc_summary_pt_br"):
if not out.get(k) and seg.get(k):
out[k] = seg[k]
# primary_topics: union, preserve first-seen order
seen = set()
for seg in segments:
for t in seg.get("primary_topics") or []:
if t and t not in seen:
seen.add(t); out["primary_topics"].append(t)
# noise / investigative — take WORST across segments
NOISE_ORDER = {"none": 0, "low": 1, "medium": 2, "high": 3}
INV_ORDER = {"none": 0, "low": 1, "medium": 2, "high": 3, "critical": 4}
noise_max, inv_max = None, None
for seg in segments:
n = seg.get("noise_emission")
if n in NOISE_ORDER and (noise_max is None or NOISE_ORDER[n] > NOISE_ORDER[noise_max]):
noise_max = n
i = seg.get("investigative_value")
if i in INV_ORDER and (inv_max is None or INV_ORDER[i] > INV_ORDER[inv_max]):
inv_max = i
out["noise_emission"] = noise_max
out["investigative_value"] = inv_max
# Entities — dedup by canonical key
def merge_list(key: str, key_fn):
seen_keys: dict = {}
for seg in segments:
for item in seg.get(key) or []:
if not isinstance(item, dict): continue
k = key_fn(item)
if not k: continue
if k in seen_keys:
# merge: union evidence_chunks, aliases_in_doc; keep highest confidence
existing = seen_keys[k]
ev = set(existing.get("evidence_chunks") or []) | set(item.get("evidence_chunks") or [])
existing["evidence_chunks"] = sorted(ev)
al = set(existing.get("aliases_in_doc") or []) | set(item.get("aliases_in_doc") or [])
al.discard(existing.get("name", ""))
existing["aliases_in_doc"] = sorted(al)
# confidence: keep max
conf_order = {"low": 0, "medium": 1, "high": 2}
if conf_order.get(item.get("confidence"), 0) > conf_order.get(existing.get("confidence"), 0):
existing["confidence"] = item["confidence"]
else:
seen_keys[k] = item
return list(seen_keys.values())
out["events"] = merge_list("events",
lambda e: (lower_key(e.get("label")), e.get("date_start") or ""))
out["people"] = merge_list("people",
lambda p: lower_key(p.get("name")))
out["organizations"] = merge_list("organizations",
lambda o: lower_key(o.get("name")))
out["locations"] = merge_list("locations",
lambda l: lower_key(l.get("name")))
# Relations: dedup by (source_class, source_name_lower, type, target_class, target_name_lower)
rel_seen: dict = {}
for seg in segments:
for r in seg.get("relations") or []:
if not isinstance(r, dict): continue
key = (
r.get("source_class"),
lower_key(r.get("source_name")),
r.get("type"),
r.get("target_class"),
lower_key(r.get("target_name")),
)
if key in rel_seen:
existing = rel_seen[key]
ev = set(existing.get("evidence_chunks") or []) | set(r.get("evidence_chunks") or [])
existing["evidence_chunks"] = sorted(ev)
else:
rel_seen[key] = r
out["relations"] = list(rel_seen.values())
return out
def main() -> int:
if len(sys.argv) < 2:
sys.exit("usage: run.py <doc-id>")
doc_id = sys.argv[1]
out_path = RAW / f"{doc_id}--subagent" / "_reextract.json"
print(f"[1/N] Building doc text ...")
doc_text = build_doc_text(doc_id)
print(f" {len(doc_text)} chars (~{len(doc_text) // 4} tokens)")
segments = segment_doc(doc_text)
n_seg = len(segments)
print(f"[2/N] Splitting into {n_seg} segment(s) of ~{SEGMENT_INPUT_CHARS // 1000}k chars each")
for i, s in enumerate(segments, 1):
print(f" segment {i}: {len(s)} chars")
from concurrent.futures import ThreadPoolExecutor, as_completed
def run_pass(seg_idx: int, seg_text: str, pass_name: str, pass_instr: str) -> tuple[int, str, dict | None, str]:
meta_label = f"\n\n[SEGMENT {seg_idx + 1} OF {n_seg}] — extract everything in THIS segment exhaustively.\n"
prompt = build_full_prompt(doc_id, seg_text, segment_meta=meta_label,
pass_instruction=pass_instr)
raw, _meta = call_claude(prompt)
json_text = extract_json_block(raw)
try:
piece = json.loads(json_text)
return (seg_idx, pass_name, piece, "")
except json.JSONDecodeError as e:
return (seg_idx, pass_name, None, f"{e} | raw_len={len(raw)}")
# Fire ALL (segment, pass) jobs in parallel
extracted: list[dict] = [{"doc_id": doc_id} for _ in range(n_seg)]
n_jobs = n_seg * len(PASSES)
print(f"[3/N] Firing {n_jobs} parallel passes ({n_seg} segments × {len(PASSES)} passes)")
errors: list[str] = []
completed = 0
# Cap inner concurrency so outer-WORKERS × inner doesn't fork-bomb the box.
# 5 = one slot per pass; segments process serially within a doc.
INNER_MAX = int(os.environ.get("REEXTRACT_INNER_MAX", "5"))
with ThreadPoolExecutor(max_workers=min(INNER_MAX, n_jobs)) as pool:
futures = []
for seg_idx, seg in enumerate(segments):
for pass_name, pass_instr in PASSES:
futures.append(pool.submit(run_pass, seg_idx, seg, pass_name, pass_instr))
for fut in as_completed(futures):
seg_idx, pass_name, piece, err = fut.result()
completed += 1
tag = f"seg{seg_idx+1}/{pass_name}"
if err:
errors.append(f"{tag}: {err}")
debug = out_path.parent / f"_reextract_raw_seg{seg_idx+1}_{pass_name}.txt"
print(f" [{completed}/{n_jobs}] {tag} FAILED — {err[:120]}")
else:
if isinstance(piece, dict):
for k, v in piece.items():
extracted[seg_idx][k] = v
print(f" [{completed}/{n_jobs}] {tag} OK")
print(f"[4/N] Merging {n_seg} extraction(s) ...")
merged = merge_extractions(extracted, doc_id) if n_seg > 1 else {**extracted[0], "doc_id": doc_id}
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_text(json.dumps(merged, indent=2, ensure_ascii=False), encoding="utf-8")
print(f" saved {out_path}")
print(f"[5/N] Validating ...")
v = subprocess.run(
["python3", str(VALIDATE), doc_id, str(out_path)],
capture_output=True, text=True, encoding="utf-8",
)
print(v.stdout.strip())
return v.returncode
if __name__ == "__main__":
sys.exit(main())