#!/usr/bin/env python3 """ 04-lint.py — Phase 8 — Lint + backlink rebuild Scans all .md files in wiki/ and case/ and: 1. Parses frontmatter 2. Collects all entity files + all wiki-links 3. Validates schema: - Required universal fields (schema_version, type, canonical_title|canonical_name, wiki_version) - Type-specific required fields - Page sequence continuity per document - Evidence grade ↔ chain_of_custody steps 4. Validates wiki-links: every [[link]] must resolve 5. Rebuilds mentioned_in[] in entity files (reverse scan from pages) 6. Reports: orphans, broken links, duplicate canonical names, missing fields 7. Appends LINT entry to wiki/log.md Default mode = report-only (read-only safe). Use --fix to write back rebuilt mentioned_in[] and last_lint timestamps. Uso: ./04-lint.py # report only ./04-lint.py --fix # rebuild backlinks + write ./04-lint.py --scope wiki # restrict to wiki/ (skip case/) ./04-lint.py --strict # exit non-zero on any error """ from __future__ import annotations import argparse import re import sys from collections import defaultdict from datetime import datetime, timezone from pathlib import Path try: import yaml except ImportError: sys.stderr.write("Missing pyyaml. Run: pip3 install pyyaml\n") sys.exit(1) UFO_ROOT = Path("/Users/guto/ufo") WIKI_BASE = UFO_ROOT / "wiki" CASE_BASE = UFO_ROOT / "case" LOG_PATH = WIKI_BASE / "log.md" # ---------------------------------------------------------------------- # Required-field tables # ---------------------------------------------------------------------- UNIVERSAL_REQUIRED = ["schema_version", "type", "wiki_version"] # For most types, at least one of these name fields is required. # Exceptions are listed in TYPES_WITHOUT_CANONICAL_NAME — they identify # themselves via a type-specific id (e.g. page_id, log files have no id). NAME_FIELDS = ["canonical_title", "canonical_name"] TYPES_WITHOUT_CANONICAL_NAME = {"page"} # page uses page_id as unique identifier TYPE_REQUIRED = { "document": ["doc_id", "original_filename", "raw_path", "sha256", "page_count", "collection", "document_class", "content_classification", "pages"], "page": ["page_id", "doc_id", "page_number", "png_path", "vision_model", "page_type", "content_classification", "entities_extracted"], "entity": ["entity_class"], # plus class-specific id "table": ["table_id", "source_doc", "spans_pages"], "image": ["image_id", "image_type", "source_page", "bbox_on_page", "vision_description"], "evidence": ["evidence_id", "evidence_grade", "evidence_class", "source_page", "chain_of_custody", "supports_claims"], "witness_analysis": ["witness_id", "witness_person", "event_witnessed", "statements", "verdict"], "timeline": ["timeline_scope", "period", "entries"], "hypothesis": ["hypothesis_id", "hypothesis_class", "status", "falsification_tests", "evidence_for", "evidence_against"], "actor_profile": ["actor_profile_id", "actor", "motive", "means", "opportunity", "modus_operandi"], "gap": ["gap_id", "gap_class", "description", "detected_in", "severity"], "relation": ["relation_id", "relation_class", "nodes", "connection_description", "confidence_band"], "case_report": ["case_id", "chapters", "quality_rubrics", "overall_quality_score"], "residual_uncertainty": ["unknowns_known", "calibration_table", "what_would_change_conclusion"], "index": ["stats", "hubs"], "log": [], } ENTITY_CLASS_ID = { "person": "person_id", "organization": "organization_id", "location": "location_id", "event": "event_id", "uap_object": "uap_object_id", "vehicle": "vehicle_id", "operation": "operation_id", "concept": "concept_id", } # wiki-link namespace → directory under UFO_ROOT NAMESPACE_DIR = { "people": "wiki/entities/people", "org": "wiki/entities/organizations", "loc": "wiki/entities/locations", "event": "wiki/entities/events", "uap": "wiki/entities/uap-objects", "vehicle": "wiki/entities/vehicles", "op": "wiki/entities/operations", "concept": "wiki/entities/concepts", "table": "wiki/tables", "image": "wiki/images", "evidence": "case/evidence", "witness": "case/witnesses", "hypothesis": "case/hypotheses", "profile": "case/profiles", "gap": "case/gaps", "relation": "case/connect-the-dots", "case": "case", # for [[case/case-report]], [[case/residual-uncertainty]] } WIKI_LINK_RE = re.compile(r"\[\[([^\]|]+)(?:\|[^\]]+)?\]\]") def utc_now_iso() -> str: return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") def read_md(path: Path) -> tuple[dict, str]: content = path.read_text(encoding="utf-8") if not content.startswith("---"): return {}, content end = content.find("---", 4) if end == -1: return {}, content try: fm = yaml.safe_load(content[3:end].strip()) or {} except yaml.YAMLError as e: return {"_yaml_error": str(e)}, content[end + 3 :] return fm, content[end + 3 :].lstrip("\n") def write_md(path: Path, fm: dict, body: str) -> bool: yaml_str = yaml.dump(fm, allow_unicode=True, sort_keys=False, default_flow_style=False) new_content = f"---\n{yaml_str}---\n\n{body}" if not body.startswith("\n") else f"---\n{yaml_str}---\n{body}" if path.exists() and path.read_text(encoding="utf-8") == new_content: return False path.write_text(new_content, encoding="utf-8") return True def iter_md_files(scope: str) -> list[Path]: """List all .md files under wiki/ and/or case/.""" out: list[Path] = [] if scope in ("wiki", "all"): out.extend(WIKI_BASE.rglob("*.md")) if scope in ("case", "all"): out.extend(CASE_BASE.rglob("*.md")) return sorted(out) def resolve_link(target: str) -> tuple[str, Path | None]: """Resolve a wiki-link target to a filesystem path. Returns (kind, path_or_None).""" target = target.strip() # Page link: /p m = re.match(r"^([a-z0-9][a-z0-9-]*)/p(\d{3})$", target) if m: doc_id, padded = m.group(1), m.group(2) return ("page", UFO_ROOT / "wiki" / "pages" / doc_id / f"p{padded}.md") # Namespaced link: / if "/" in target: ns, rest = target.split("/", 1) if ns in NAMESPACE_DIR: return (ns, UFO_ROOT / NAMESPACE_DIR[ns] / f"{rest}.md") # Bare doc_id candidate = UFO_ROOT / "wiki" / "documents" / f"{target}.md" return ("document", candidate) def collect_inventory(scope: str) -> dict: """Walk all .md files; return inventory of frontmatters and wiki-links.""" files = iter_md_files(scope) inv = { "files": [], "by_path": {}, "links_out": defaultdict(list), # source_path → [(target, resolved_path)] "links_in": defaultdict(list), # target_path_str → [source_path] "entity_files": {}, # canonical_id → path (for dedup detection) "canonical_name_index": defaultdict(list), # name → [paths] "page_files_by_doc": defaultdict(list), # doc_id → [(page_num, path)] } for path in files: fm, body = read_md(path) rel = path.relative_to(UFO_ROOT) inv["files"].append(path) inv["by_path"][str(path)] = {"fm": fm, "body": body, "rel": rel} if fm.get("type") == "page": doc_id = fm.get("doc_id", "") page_num = fm.get("page_number") if doc_id and isinstance(page_num, int): inv["page_files_by_doc"][doc_id].append((page_num, path)) # Track canonical name uniqueness cname = fm.get("canonical_name") or fm.get("canonical_title") if cname: inv["canonical_name_index"][cname].append(path) # Find all wiki-links in body for match in WIKI_LINK_RE.findall(body): kind, resolved = resolve_link(match) inv["links_out"][str(path)].append({"target": match, "kind": kind, "resolved": resolved}) if resolved is not None: inv["links_in"][str(resolved)].append(path) return inv def validate_required_fields(fm: dict, path: Path) -> list[str]: """Return list of missing-field errors.""" errors: list[str] = [] # YAML parse error if "_yaml_error" in fm: errors.append(f"yaml-parse-error: {fm['_yaml_error']}") return errors # Universal for f in UNIVERSAL_REQUIRED: if f not in fm: errors.append(f"missing-universal-field: {f}") if fm.get("type") not in TYPES_WITHOUT_CANONICAL_NAME: if not any(k in fm for k in NAME_FIELDS): errors.append(f"missing-name-field: need one of {NAME_FIELDS}") # Type-specific t = fm.get("type") if t in TYPE_REQUIRED: for f in TYPE_REQUIRED[t]: if f not in fm or fm[f] is None or fm[f] == []: errors.append(f"missing-{t}-field: {f}") # Entity-specific id field if t == "entity": cls = fm.get("entity_class") if cls in ENTITY_CLASS_ID: id_field = ENTITY_CLASS_ID[cls] if id_field not in fm: errors.append(f"missing-entity-id: {id_field} for entity_class={cls}") else: errors.append(f"unknown-entity-class: {cls!r}") # Evidence: grade A → ≥3 custody, B → ≥2, C → ≥1 if t == "evidence": grade = fm.get("evidence_grade") custody = fm.get("chain_of_custody") or [] min_steps = {"A": 3, "B": 2, "C": 1}.get(grade, 0) if len(custody) < min_steps: errors.append(f"evidence-grade-{grade}-needs-{min_steps}-custody-steps (has {len(custody)})") # Hypothesis posterior > 0.50 → ≥2 evidence_for if t == "hypothesis": post = fm.get("posterior_probability") or 0 if isinstance(post, (int, float)) and post > 0.50: ev_for = fm.get("evidence_for") or [] if len(ev_for) < 2: errors.append(f"hypothesis-posterior-{post}-needs-2-evidence_for (has {len(ev_for)})") return errors def validate_page_sequences(inv: dict) -> list[str]: """For each document, pages must be 1..page_count contiguous.""" errors = [] for path_str, info in inv["by_path"].items(): fm = info["fm"] if fm.get("type") != "document": continue doc_id = fm.get("doc_id") page_count = fm.get("page_count") if not doc_id or not isinstance(page_count, int): continue actual = inv["page_files_by_doc"].get(doc_id, []) actual_nums = sorted({n for n, _ in actual}) expected = list(range(1, page_count + 1)) missing = set(expected) - set(actual_nums) extra = set(actual_nums) - set(expected) if missing or extra: errors.append(f"doc {doc_id}: page sequence broken (missing={sorted(missing)}, extra={sorted(extra)})") return errors def validate_canonical_uniqueness(inv: dict) -> list[str]: """Two distinct files cannot share canonical_name without disambiguation_note.""" errors = [] for name, paths in inv["canonical_name_index"].items(): if len(paths) <= 1: continue # Allow duplicates if ALL files declare disambiguation_note all_have_note = all(inv["by_path"][str(p)]["fm"].get("disambiguation_note") for p in paths) if not all_have_note: rels = [str(p.relative_to(UFO_ROOT)) for p in paths] errors.append(f"duplicate-canonical-name {name!r}: in {rels}") return errors def validate_links(inv: dict) -> tuple[list[str], list[str]]: """Check that every wiki-link resolves.""" broken = [] warned = [] for source_path_str, links in inv["links_out"].items(): for ln in links: target_path = ln["resolved"] if target_path is None: broken.append(f"unparseable-link in {Path(source_path_str).relative_to(UFO_ROOT)}: [[{ln['target']}]]") continue if not target_path.exists(): broken.append(f"broken-link in {Path(source_path_str).relative_to(UFO_ROOT)}: [[{ln['target']}]] → {target_path.relative_to(UFO_ROOT)}") return broken, warned def detect_orphans(inv: dict) -> list[str]: """Entity files with zero inbound links (not referenced anywhere).""" orphans = [] for path_str, info in inv["by_path"].items(): fm = info["fm"] if fm.get("type") != "entity": continue path = Path(path_str) if not inv["links_in"].get(str(path)): rel = path.relative_to(UFO_ROOT) orphans.append(f"orphan: {rel}") return orphans # ---------------------------------------------------------------------- # Backlink rebuild # ---------------------------------------------------------------------- def _canonicalize_name(name: str) -> str: """Same algorithm used by script 03 (kebab-case ASCII-fold).""" import unicodedata as ud nfkd = ud.normalize("NFKD", name or "") ascii_str = "".join(c for c in nfkd if not ud.combining(c)) lower = ascii_str.lower() replaced = re.sub(r"[^a-z0-9-]", "-", lower) collapsed = re.sub(r"-+", "-", replaced).strip("-") if collapsed and collapsed[0].isdigit(): collapsed = "x-" + collapsed return collapsed PAGE_CLASS_TO_ENTITY_CLASS = { "people": "person", "organizations": "organization", "locations": "location", "vehicles": "vehicle", "operations": "operation", "concepts": "concept", } def _build_alias_index(inv: dict) -> dict[tuple[str, str], Path]: """Build {(entity_class, alias_key): entity_path} where alias_key is the canonicalized form of every name/alias/canonical_name/concept_id under that entity. Used to resolve free-text entity names extracted by Haiku back to the curated entity file (which may have a friendlier canonical_id). """ index: dict[tuple[str, str], Path] = {} for path_str, info in inv["by_path"].items(): fm = info["fm"] if fm.get("type") != "entity": continue ec = fm.get("entity_class") if not ec: continue keys: set[str] = set() # canonical name + aliases + canonical_id itself cname = fm.get("canonical_name") if cname: keys.add(_canonicalize_name(cname)) for alias in (fm.get("aliases") or []): if isinstance(alias, str): keys.add(_canonicalize_name(alias)) id_field = ENTITY_CLASS_ID.get(ec) if id_field and id_field in fm: keys.add(_canonicalize_name(fm[id_field])) # Also include filename stem keys.add(_canonicalize_name(Path(path_str).stem)) for key in keys: if key: index[(ec, key)] = Path(path_str) return index def rebuild_backlinks(inv: dict, dry_run: bool) -> tuple[int, int]: """For each entity file, materialize mentioned_in[] from page entities_extracted. Resolution of "free-text entity name from Haiku" → "curated entity file" uses the alias index (canonical_name + aliases + canonical_id all match). Returns (entities_updated, entities_unchanged). """ updated = unchanged = 0 alias_index = _build_alias_index(inv) # entity_file_path → list[(page_id, doc_id, role)] mentions_by_entity: dict[str, list[tuple[str, str, str]]] = defaultdict(list) for path_str, info in inv["by_path"].items(): fm = info["fm"] if fm.get("type") != "page": continue page_id = fm.get("page_id", "") doc_id = fm.get("doc_id", "") ents = fm.get("entities_extracted") or {} for cls, entries in ents.items(): if cls not in PAGE_CLASS_TO_ENTITY_CLASS: continue ec = PAGE_CLASS_TO_ENTITY_CLASS[cls] for entry in (entries or []): if not isinstance(entry, dict): continue name = entry.get("name") if not name: continue key = _canonicalize_name(name) target = alias_index.get((ec, key)) if not target: continue role = entry.get("role_in_page", "mentioned") if cls == "people" else "mentioned" mentions_by_entity[str(target)].append((page_id, doc_id, role)) # Walk all entities and write mentioned_in[] for path_str, info in inv["by_path"].items(): fm = info["fm"] if fm.get("type") != "entity": continue ec = fm.get("entity_class") if ec not in PAGE_CLASS_TO_ENTITY_CLASS.values(): # event, uap_object: their links come via documented_in/observed_in_event, not page entities_extracted continue mentions_raw = mentions_by_entity.get(path_str, []) per_page: dict[str, dict] = {} for page_id, doc_id, role in mentions_raw: if page_id not in per_page: per_page[page_id] = {"page": f"[[{page_id}]]", "mention_count": 0, "role_in_page": role} per_page[page_id]["mention_count"] += 1 mentioned_in = sorted(per_page.values(), key=lambda x: -x["mention_count"]) total = sum(x["mention_count"] for x in mentioned_in) unique_docs = {pg.split("/", 1)[0] for pg in per_page.keys()} new_fm = dict(fm) new_fm["mentioned_in"] = mentioned_in new_fm["total_mentions"] = total new_fm["documents_count"] = len(unique_docs) # Idempotency: only bump last_lint if the substantive data changed prev_lint = fm.get("last_lint") snapshot_prev = {k: v for k, v in fm.items() if k != "last_lint"} snapshot_new = {k: v for k, v in new_fm.items() if k != "last_lint"} if snapshot_prev == snapshot_new: unchanged += 1 continue new_fm["last_lint"] = utc_now_iso() if dry_run: updated += 1 else: changed = write_md(Path(path_str), new_fm, info["body"]) if changed: updated += 1 else: unchanged += 1 return updated, unchanged # ---------------------------------------------------------------------- # Main # ---------------------------------------------------------------------- def main(): ap = argparse.ArgumentParser(description="Lint wiki/case + rebuild backlinks.") ap.add_argument("--scope", choices=["wiki", "case", "all"], default="all", help="scope to scan") ap.add_argument("--fix", action="store_true", help="actually rewrite backlinks (default = report only)") ap.add_argument("--strict", action="store_true", help="exit non-zero on any error") args = ap.parse_args() print(f"Scanning scope={args.scope}...", flush=True) inv = collect_inventory(args.scope) print(f" files: {len(inv['files'])}", flush=True) all_errors: list[str] = [] all_warnings: list[str] = [] # 1. Required fields field_errors = [] for path_str, info in inv["by_path"].items(): for err in validate_required_fields(info["fm"], Path(path_str)): field_errors.append(f"{Path(path_str).relative_to(UFO_ROOT)}: {err}") all_errors.extend(field_errors) # 2. Page sequence page_errors = validate_page_sequences(inv) all_errors.extend(page_errors) # 3. Canonical uniqueness name_errors = validate_canonical_uniqueness(inv) all_errors.extend(name_errors) # 4. Links broken, link_warnings = validate_links(inv) all_errors.extend(broken) all_warnings.extend(link_warnings) # 5. Orphans (warning, not error) orphans = detect_orphans(inv) all_warnings.extend(orphans) # 6. Rebuild backlinks updated, unchanged = rebuild_backlinks(inv, dry_run=not args.fix) # Report print("\n=== LINT REPORT ===") print(f" files scanned: {len(inv['files'])}") print(f" errors: {len(all_errors)}") for e in all_errors[:50]: print(f" ✗ {e}") if len(all_errors) > 50: print(f" … and {len(all_errors) - 50} more") print(f" warnings: {len(all_warnings)}") for w in all_warnings[:20]: print(f" ⚠ {w}") if len(all_warnings) > 20: print(f" … and {len(all_warnings) - 20} more") action = "would-update" if not args.fix else "updated" print(f" backlinks: {action}={updated}, unchanged={unchanged}") # Log entry if args.fix: with open(LOG_PATH, "a", encoding="utf-8") as fh: fh.write(f"\n## {utc_now_iso()} — LINT (Phase 8)\n") fh.write(f"- operator: archivist\n- scope: {args.scope}\n- files_scanned: {len(inv['files'])}\n") fh.write(f"- errors: {len(all_errors)}\n- warnings: {len(all_warnings)}\n") fh.write(f"- backlinks_updated: {updated}\n- backlinks_unchanged: {unchanged}\n") if all_errors: fh.write("- top_errors:\n") for e in all_errors[:10]: fh.write(f" - {e}\n") if args.strict and all_errors: sys.exit(1) if __name__ == "__main__": main()