#!/usr/bin/env python3 """ validate.py — Validate a Sonnet re-extraction JSON against the closed enums in enums.yaml. Returns exit 0 if valid; prints errors and exits 1 otherwise. Run: python3 scripts/reextract/validate.py """ from __future__ import annotations import json import re import sys from pathlib import Path import yaml REX_DIR = Path("/Users/guto/ufo/scripts/reextract") OUT_DIR = Path("/Users/guto/ufo/raw") def load_enums() -> dict[str, set[str]]: raw = yaml.safe_load((REX_DIR / "enums.yaml").read_text(encoding="utf-8")) return {k: set(v) for k, v in raw.items()} def validate(data: dict, enums: dict[str, set[str]], doc_id: str) -> list[str]: errs: list[str] = [] def check_enum(value, enum_name: str, ctx: str): if value is None: return if value not in enums.get(enum_name, set()): errs.append(f"{ctx}: '{value}' not in enum:{enum_name}") def check_list_enum(values, enum_name: str, ctx: str): if not isinstance(values, list): return for i, v in enumerate(values): check_enum(v, enum_name, f"{ctx}[{i}]") # top-level if data.get("doc_id") != doc_id: errs.append(f"top: doc_id mismatch: '{data.get('doc_id')}' != '{doc_id}'") check_enum(data.get("doc_classification"), "doc_classification", "top.doc_classification") check_enum(data.get("noise_emission"), "noise_emission", "top.noise_emission") check_enum(data.get("investigative_value"), "investigative_value", "top.investigative_value") check_list_enum(data.get("primary_topics"), "primary_topics", "top.primary_topics") # known chunk IDs from index — to verify evidence_chunks exist idx_path = OUT_DIR / f"{doc_id}--subagent" / "_index.json" known_chunks: set[str] = set() if idx_path.is_file(): try: idx = json.loads(idx_path.read_text(encoding="utf-8")) known_chunks = {c.get("chunk_id") for c in idx.get("chunks", [])} except Exception: pass def check_evidence(refs, ctx: str): if not isinstance(refs, list): errs.append(f"{ctx}: evidence_chunks must be list") return if not refs: errs.append(f"{ctx}: evidence_chunks empty") return for r in refs: if not isinstance(r, str) or not re.match(r"^c\d+$", r): errs.append(f"{ctx}: bad chunk_id '{r}'") elif known_chunks and r not in known_chunks: errs.append(f"{ctx}: unknown chunk_id '{r}' (not in _index.json)") # events for i, ev in enumerate(data.get("events") or []): ctx = f"events[{i}]" if not isinstance(ev, dict): errs.append(f"{ctx}: not object"); continue check_enum(ev.get("event_class"), "event_class", f"{ctx}.event_class") check_enum(ev.get("date_confidence"), "date_confidence", f"{ctx}.date_confidence") check_enum(ev.get("primary_location_geo_class"), "geo_class", f"{ctx}.primary_location_geo_class") check_enum(ev.get("confidence"), "confidence", f"{ctx}.confidence") check_evidence(ev.get("evidence_chunks"), ctx) for j, o in enumerate(ev.get("observers") or []): check_enum(o.get("role_at_event") if isinstance(o, dict) else None, "person_class", f"{ctx}.observers[{j}].role_at_event") for j, u in enumerate(ev.get("uap_objects_observed") or []): if not isinstance(u, dict): continue check_enum(u.get("shape"), "uap_shape", f"{ctx}.uap[{j}].shape") check_enum(u.get("color"), "uap_color", f"{ctx}.uap[{j}].color") check_enum(u.get("medium"), "uap_medium", f"{ctx}.uap[{j}].medium") # date format for k in ("date_start", "date_end"): v = ev.get(k) if v and not re.match(r"^\d{4}(-\d{2}(-\d{2})?)?$|^XXXX(-XX(-XX)?)?$", v): errs.append(f"{ctx}.{k}: bad date format '{v}'") # people for i, p in enumerate(data.get("people") or []): ctx = f"people[{i}]" if not isinstance(p, dict): errs.append(f"{ctx}: not object"); continue check_enum(p.get("person_class"), "person_class", f"{ctx}.person_class") check_enum(p.get("confidence"), "confidence", f"{ctx}.confidence") check_evidence(p.get("evidence_chunks"), ctx) # organizations for i, o in enumerate(data.get("organizations") or []): ctx = f"organizations[{i}]" if not isinstance(o, dict): errs.append(f"{ctx}: not object"); continue check_enum(o.get("org_class"), "org_class", f"{ctx}.org_class") check_enum(o.get("confidence"), "confidence", f"{ctx}.confidence") check_evidence(o.get("evidence_chunks"), ctx) # locations for i, l in enumerate(data.get("locations") or []): ctx = f"locations[{i}]" if not isinstance(l, dict): errs.append(f"{ctx}: not object"); continue check_enum(l.get("geo_class"), "geo_class", f"{ctx}.geo_class") check_enum(l.get("confidence"), "confidence", f"{ctx}.confidence") check_evidence(l.get("evidence_chunks"), ctx) # relations valid_classes = {"person", "event", "organization", "location", "uap_object", "document"} for i, r in enumerate(data.get("relations") or []): ctx = f"relations[{i}]" if not isinstance(r, dict): errs.append(f"{ctx}: not object"); continue check_enum(r.get("type"), "relation_type", f"{ctx}.type") check_enum(r.get("confidence"), "confidence", f"{ctx}.confidence") check_evidence(r.get("evidence_chunks"), ctx) for k in ("source_class", "target_class"): v = r.get(k) if v not in valid_classes: errs.append(f"{ctx}.{k}: '{v}' not in {valid_classes}") return errs def main() -> int: if len(sys.argv) < 2: sys.exit("usage: validate.py []") doc_id = sys.argv[1] json_path = sys.argv[2] if len(sys.argv) > 2 else str(OUT_DIR / f"{doc_id}--subagent" / "_reextract.json") p = Path(json_path) if not p.is_file(): sys.exit(f"json not found: {p}") try: data = json.loads(p.read_text(encoding="utf-8")) except json.JSONDecodeError as e: sys.exit(f"JSON parse error: {e}") enums = load_enums() errs = validate(data, enums, doc_id) if errs: print(f"❌ {len(errs)} validation errors for {doc_id}:") for e in errs[:50]: print(f" - {e}") if len(errs) > 50: print(f" ... +{len(errs) - 50} more") return 1 print(f"✓ valid: {doc_id}") print(f" events: {len(data.get('events') or [])}") print(f" people: {len(data.get('people') or [])}") print(f" orgs: {len(data.get('organizations') or [])}") print(f" locs: {len(data.get('locations') or [])}") print(f" rels: {len(data.get('relations') or [])}") return 0 if __name__ == "__main__": sys.exit(main())