disclosure-bureau/scripts/reextract/validate.py
Luiz Gustavo a7e9dce6d2 rebuild entity layer from Sonnet-vision reextract pipeline
Add reextract pipeline (scripts/reextract/) that rebuilds doc-level entity
JSON from Sonnet-vision chunks via Opus, replacing the noisy per-page
extraction. Add synthesize scripts to regenerate wiki/entities from the 116
_reextract.json (30), aggregate missing page.md from chunks (31), and reprocess
805 pages the doc-rebuilder agent dropped on context overflow (32). Add
maintain scripts 43-56 for chunk-page sync, dedup, generic-entity marking, and
typed relation extraction.

Web: wire relations API + entity-relations component; entity/timeline/doc
pages consume the rebuilt layer.

Note: raw/, processing/, wiki/ remain gitignored (bulk data managed
separately); the 116 reextract JSONs and 7,798 rebuilt entity files live on
disk only. The 27 curated anchor events under wiki/entities/events/ are
preserved.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-21 12:20:24 -03:00

169 lines
6.9 KiB
Python

#!/usr/bin/env python3
"""
validate.py — Validate a Sonnet re-extraction JSON against the closed enums
in enums.yaml. Returns exit 0 if valid; prints errors and exits 1 otherwise.
Run:
python3 scripts/reextract/validate.py <doc-id>
"""
from __future__ import annotations
import json
import re
import sys
from pathlib import Path
import yaml
REX_DIR = Path("/Users/guto/ufo/scripts/reextract")
OUT_DIR = Path("/Users/guto/ufo/raw")
def load_enums() -> dict[str, set[str]]:
raw = yaml.safe_load((REX_DIR / "enums.yaml").read_text(encoding="utf-8"))
return {k: set(v) for k, v in raw.items()}
def validate(data: dict, enums: dict[str, set[str]], doc_id: str) -> list[str]:
errs: list[str] = []
def check_enum(value, enum_name: str, ctx: str):
if value is None: return
if value not in enums.get(enum_name, set()):
errs.append(f"{ctx}: '{value}' not in enum:{enum_name}")
def check_list_enum(values, enum_name: str, ctx: str):
if not isinstance(values, list): return
for i, v in enumerate(values):
check_enum(v, enum_name, f"{ctx}[{i}]")
# top-level
if data.get("doc_id") != doc_id:
errs.append(f"top: doc_id mismatch: '{data.get('doc_id')}' != '{doc_id}'")
check_enum(data.get("doc_classification"), "doc_classification", "top.doc_classification")
check_enum(data.get("noise_emission"), "noise_emission", "top.noise_emission")
check_enum(data.get("investigative_value"), "investigative_value", "top.investigative_value")
check_list_enum(data.get("primary_topics"), "primary_topics", "top.primary_topics")
# known chunk IDs from index — to verify evidence_chunks exist
idx_path = OUT_DIR / f"{doc_id}--subagent" / "_index.json"
known_chunks: set[str] = set()
if idx_path.is_file():
try:
idx = json.loads(idx_path.read_text(encoding="utf-8"))
known_chunks = {c.get("chunk_id") for c in idx.get("chunks", [])}
except Exception:
pass
def check_evidence(refs, ctx: str):
if not isinstance(refs, list):
errs.append(f"{ctx}: evidence_chunks must be list")
return
if not refs:
errs.append(f"{ctx}: evidence_chunks empty")
return
for r in refs:
if not isinstance(r, str) or not re.match(r"^c\d+$", r):
errs.append(f"{ctx}: bad chunk_id '{r}'")
elif known_chunks and r not in known_chunks:
errs.append(f"{ctx}: unknown chunk_id '{r}' (not in _index.json)")
# events
for i, ev in enumerate(data.get("events") or []):
ctx = f"events[{i}]"
if not isinstance(ev, dict):
errs.append(f"{ctx}: not object"); continue
check_enum(ev.get("event_class"), "event_class", f"{ctx}.event_class")
check_enum(ev.get("date_confidence"), "date_confidence", f"{ctx}.date_confidence")
check_enum(ev.get("primary_location_geo_class"), "geo_class", f"{ctx}.primary_location_geo_class")
check_enum(ev.get("confidence"), "confidence", f"{ctx}.confidence")
check_evidence(ev.get("evidence_chunks"), ctx)
for j, o in enumerate(ev.get("observers") or []):
check_enum(o.get("role_at_event") if isinstance(o, dict) else None,
"person_class", f"{ctx}.observers[{j}].role_at_event")
for j, u in enumerate(ev.get("uap_objects_observed") or []):
if not isinstance(u, dict): continue
check_enum(u.get("shape"), "uap_shape", f"{ctx}.uap[{j}].shape")
check_enum(u.get("color"), "uap_color", f"{ctx}.uap[{j}].color")
check_enum(u.get("medium"), "uap_medium", f"{ctx}.uap[{j}].medium")
# date format
for k in ("date_start", "date_end"):
v = ev.get(k)
if v and not re.match(r"^\d{4}(-\d{2}(-\d{2})?)?$|^XXXX(-XX(-XX)?)?$", v):
errs.append(f"{ctx}.{k}: bad date format '{v}'")
# people
for i, p in enumerate(data.get("people") or []):
ctx = f"people[{i}]"
if not isinstance(p, dict):
errs.append(f"{ctx}: not object"); continue
check_enum(p.get("person_class"), "person_class", f"{ctx}.person_class")
check_enum(p.get("confidence"), "confidence", f"{ctx}.confidence")
check_evidence(p.get("evidence_chunks"), ctx)
# organizations
for i, o in enumerate(data.get("organizations") or []):
ctx = f"organizations[{i}]"
if not isinstance(o, dict):
errs.append(f"{ctx}: not object"); continue
check_enum(o.get("org_class"), "org_class", f"{ctx}.org_class")
check_enum(o.get("confidence"), "confidence", f"{ctx}.confidence")
check_evidence(o.get("evidence_chunks"), ctx)
# locations
for i, l in enumerate(data.get("locations") or []):
ctx = f"locations[{i}]"
if not isinstance(l, dict):
errs.append(f"{ctx}: not object"); continue
check_enum(l.get("geo_class"), "geo_class", f"{ctx}.geo_class")
check_enum(l.get("confidence"), "confidence", f"{ctx}.confidence")
check_evidence(l.get("evidence_chunks"), ctx)
# relations
valid_classes = {"person", "event", "organization", "location", "uap_object", "document"}
for i, r in enumerate(data.get("relations") or []):
ctx = f"relations[{i}]"
if not isinstance(r, dict):
errs.append(f"{ctx}: not object"); continue
check_enum(r.get("type"), "relation_type", f"{ctx}.type")
check_enum(r.get("confidence"), "confidence", f"{ctx}.confidence")
check_evidence(r.get("evidence_chunks"), ctx)
for k in ("source_class", "target_class"):
v = r.get(k)
if v not in valid_classes:
errs.append(f"{ctx}.{k}: '{v}' not in {valid_classes}")
return errs
def main() -> int:
if len(sys.argv) < 2:
sys.exit("usage: validate.py <doc-id> [<json-path>]")
doc_id = sys.argv[1]
json_path = sys.argv[2] if len(sys.argv) > 2 else str(OUT_DIR / f"{doc_id}--subagent" / "_reextract.json")
p = Path(json_path)
if not p.is_file():
sys.exit(f"json not found: {p}")
try:
data = json.loads(p.read_text(encoding="utf-8"))
except json.JSONDecodeError as e:
sys.exit(f"JSON parse error: {e}")
enums = load_enums()
errs = validate(data, enums, doc_id)
if errs:
print(f"{len(errs)} validation errors for {doc_id}:")
for e in errs[:50]:
print(f" - {e}")
if len(errs) > 50:
print(f" ... +{len(errs) - 50} more")
return 1
print(f"✓ valid: {doc_id}")
print(f" events: {len(data.get('events') or [])}")
print(f" people: {len(data.get('people') or [])}")
print(f" orgs: {len(data.get('organizations') or [])}")
print(f" locs: {len(data.get('locations') or [])}")
print(f" rels: {len(data.get('relations') or [])}")
return 0
if __name__ == "__main__":
sys.exit(main())