disclosure-bureau/scripts/04-lint.py

545 lines
21 KiB
Python
Executable file

#!/usr/bin/env python3
"""
04-lint.py — Phase 8 — Lint + backlink rebuild
Scans all .md files in wiki/ and case/ and:
1. Parses frontmatter
2. Collects all entity files + all wiki-links
3. Validates schema:
- Required universal fields (schema_version, type, canonical_title|canonical_name, wiki_version)
- Type-specific required fields
- Page sequence continuity per document
- Evidence grade ↔ chain_of_custody steps
4. Validates wiki-links: every [[link]] must resolve
5. Rebuilds mentioned_in[] in entity files (reverse scan from pages)
6. Reports: orphans, broken links, duplicate canonical names, missing fields
7. Appends LINT entry to wiki/log.md
Default mode = report-only (read-only safe). Use --fix to write back rebuilt
mentioned_in[] and last_lint timestamps.
Uso:
./04-lint.py # report only
./04-lint.py --fix # rebuild backlinks + write
./04-lint.py --scope wiki # restrict to wiki/ (skip case/)
./04-lint.py --strict # exit non-zero on any error
"""
from __future__ import annotations
import argparse
import re
import sys
from collections import defaultdict
from datetime import datetime, timezone
from pathlib import Path
try:
import yaml
except ImportError:
sys.stderr.write("Missing pyyaml. Run: pip3 install pyyaml\n")
sys.exit(1)
UFO_ROOT = Path("/Users/guto/ufo")
WIKI_BASE = UFO_ROOT / "wiki"
CASE_BASE = UFO_ROOT / "case"
LOG_PATH = WIKI_BASE / "log.md"
# ----------------------------------------------------------------------
# Required-field tables
# ----------------------------------------------------------------------
UNIVERSAL_REQUIRED = ["schema_version", "type", "wiki_version"]
# For most types, at least one of these name fields is required.
# Exceptions are listed in TYPES_WITHOUT_CANONICAL_NAME — they identify
# themselves via a type-specific id (e.g. page_id, log files have no id).
NAME_FIELDS = ["canonical_title", "canonical_name"]
TYPES_WITHOUT_CANONICAL_NAME = {"page"} # page uses page_id as unique identifier
TYPE_REQUIRED = {
"document": ["doc_id", "original_filename", "raw_path", "sha256", "page_count", "collection", "document_class", "content_classification", "pages"],
"page": ["page_id", "doc_id", "page_number", "png_path", "vision_model", "page_type", "content_classification", "entities_extracted"],
"entity": ["entity_class"], # plus class-specific id
"table": ["table_id", "source_doc", "spans_pages"],
"image": ["image_id", "image_type", "source_page", "bbox_on_page", "vision_description"],
"evidence": ["evidence_id", "evidence_grade", "evidence_class", "source_page", "chain_of_custody", "supports_claims"],
"witness_analysis": ["witness_id", "witness_person", "event_witnessed", "statements", "verdict"],
"timeline": ["timeline_scope", "period", "entries"],
"hypothesis": ["hypothesis_id", "hypothesis_class", "status", "falsification_tests", "evidence_for", "evidence_against"],
"actor_profile": ["actor_profile_id", "actor", "motive", "means", "opportunity", "modus_operandi"],
"gap": ["gap_id", "gap_class", "description", "detected_in", "severity"],
"relation": ["relation_id", "relation_class", "nodes", "connection_description", "confidence_band"],
"case_report": ["case_id", "chapters", "quality_rubrics", "overall_quality_score"],
"residual_uncertainty": ["unknowns_known", "calibration_table", "what_would_change_conclusion"],
"index": ["stats", "hubs"],
"log": [],
}
ENTITY_CLASS_ID = {
"person": "person_id",
"organization": "organization_id",
"location": "location_id",
"event": "event_id",
"uap_object": "uap_object_id",
"vehicle": "vehicle_id",
"operation": "operation_id",
"concept": "concept_id",
}
# wiki-link namespace → directory under UFO_ROOT
NAMESPACE_DIR = {
"people": "wiki/entities/people",
"org": "wiki/entities/organizations",
"loc": "wiki/entities/locations",
"event": "wiki/entities/events",
"uap": "wiki/entities/uap-objects",
"vehicle": "wiki/entities/vehicles",
"op": "wiki/entities/operations",
"concept": "wiki/entities/concepts",
"table": "wiki/tables",
"image": "wiki/images",
"evidence": "case/evidence",
"witness": "case/witnesses",
"hypothesis": "case/hypotheses",
"profile": "case/profiles",
"gap": "case/gaps",
"relation": "case/connect-the-dots",
"case": "case", # for [[case/case-report]], [[case/residual-uncertainty]]
}
WIKI_LINK_RE = re.compile(r"\[\[([^\]|]+)(?:\|[^\]]+)?\]\]")
def utc_now_iso() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def read_md(path: Path) -> tuple[dict, str]:
content = path.read_text(encoding="utf-8")
if not content.startswith("---"):
return {}, content
end = content.find("---", 4)
if end == -1:
return {}, content
try:
fm = yaml.safe_load(content[3:end].strip()) or {}
except yaml.YAMLError as e:
return {"_yaml_error": str(e)}, content[end + 3 :]
return fm, content[end + 3 :].lstrip("\n")
def write_md(path: Path, fm: dict, body: str) -> bool:
yaml_str = yaml.dump(fm, allow_unicode=True, sort_keys=False, default_flow_style=False)
new_content = f"---\n{yaml_str}---\n\n{body}" if not body.startswith("\n") else f"---\n{yaml_str}---\n{body}"
if path.exists() and path.read_text(encoding="utf-8") == new_content:
return False
path.write_text(new_content, encoding="utf-8")
return True
def iter_md_files(scope: str) -> list[Path]:
"""List all .md files under wiki/ and/or case/."""
out: list[Path] = []
if scope in ("wiki", "all"):
out.extend(WIKI_BASE.rglob("*.md"))
if scope in ("case", "all"):
out.extend(CASE_BASE.rglob("*.md"))
return sorted(out)
def resolve_link(target: str) -> tuple[str, Path | None]:
"""Resolve a wiki-link target to a filesystem path. Returns (kind, path_or_None)."""
target = target.strip()
# Page link: <doc-id>/p<NNN>
m = re.match(r"^([a-z0-9][a-z0-9-]*)/p(\d{3})$", target)
if m:
doc_id, padded = m.group(1), m.group(2)
return ("page", UFO_ROOT / "wiki" / "pages" / doc_id / f"p{padded}.md")
# Namespaced link: <ns>/<id>
if "/" in target:
ns, rest = target.split("/", 1)
if ns in NAMESPACE_DIR:
return (ns, UFO_ROOT / NAMESPACE_DIR[ns] / f"{rest}.md")
# Bare doc_id
candidate = UFO_ROOT / "wiki" / "documents" / f"{target}.md"
return ("document", candidate)
def collect_inventory(scope: str) -> dict:
"""Walk all .md files; return inventory of frontmatters and wiki-links."""
files = iter_md_files(scope)
inv = {
"files": [],
"by_path": {},
"links_out": defaultdict(list), # source_path → [(target, resolved_path)]
"links_in": defaultdict(list), # target_path_str → [source_path]
"entity_files": {}, # canonical_id → path (for dedup detection)
"canonical_name_index": defaultdict(list), # name → [paths]
"page_files_by_doc": defaultdict(list), # doc_id → [(page_num, path)]
}
for path in files:
fm, body = read_md(path)
rel = path.relative_to(UFO_ROOT)
inv["files"].append(path)
inv["by_path"][str(path)] = {"fm": fm, "body": body, "rel": rel}
if fm.get("type") == "page":
doc_id = fm.get("doc_id", "")
page_num = fm.get("page_number")
if doc_id and isinstance(page_num, int):
inv["page_files_by_doc"][doc_id].append((page_num, path))
# Track canonical name uniqueness
cname = fm.get("canonical_name") or fm.get("canonical_title")
if cname:
inv["canonical_name_index"][cname].append(path)
# Find all wiki-links in body
for match in WIKI_LINK_RE.findall(body):
kind, resolved = resolve_link(match)
inv["links_out"][str(path)].append({"target": match, "kind": kind, "resolved": resolved})
if resolved is not None:
inv["links_in"][str(resolved)].append(path)
return inv
def validate_required_fields(fm: dict, path: Path) -> list[str]:
"""Return list of missing-field errors."""
errors: list[str] = []
# YAML parse error
if "_yaml_error" in fm:
errors.append(f"yaml-parse-error: {fm['_yaml_error']}")
return errors
# Universal
for f in UNIVERSAL_REQUIRED:
if f not in fm:
errors.append(f"missing-universal-field: {f}")
if fm.get("type") not in TYPES_WITHOUT_CANONICAL_NAME:
if not any(k in fm for k in NAME_FIELDS):
errors.append(f"missing-name-field: need one of {NAME_FIELDS}")
# Type-specific
t = fm.get("type")
if t in TYPE_REQUIRED:
for f in TYPE_REQUIRED[t]:
if f not in fm or fm[f] is None or fm[f] == []:
errors.append(f"missing-{t}-field: {f}")
# Entity-specific id field
if t == "entity":
cls = fm.get("entity_class")
if cls in ENTITY_CLASS_ID:
id_field = ENTITY_CLASS_ID[cls]
if id_field not in fm:
errors.append(f"missing-entity-id: {id_field} for entity_class={cls}")
else:
errors.append(f"unknown-entity-class: {cls!r}")
# Evidence: grade A → ≥3 custody, B → ≥2, C → ≥1
if t == "evidence":
grade = fm.get("evidence_grade")
custody = fm.get("chain_of_custody") or []
min_steps = {"A": 3, "B": 2, "C": 1}.get(grade, 0)
if len(custody) < min_steps:
errors.append(f"evidence-grade-{grade}-needs-{min_steps}-custody-steps (has {len(custody)})")
# Hypothesis posterior > 0.50 → ≥2 evidence_for
if t == "hypothesis":
post = fm.get("posterior_probability") or 0
if isinstance(post, (int, float)) and post > 0.50:
ev_for = fm.get("evidence_for") or []
if len(ev_for) < 2:
errors.append(f"hypothesis-posterior-{post}-needs-2-evidence_for (has {len(ev_for)})")
return errors
def validate_page_sequences(inv: dict) -> list[str]:
"""For each document, pages must be 1..page_count contiguous."""
errors = []
for path_str, info in inv["by_path"].items():
fm = info["fm"]
if fm.get("type") != "document":
continue
doc_id = fm.get("doc_id")
page_count = fm.get("page_count")
if not doc_id or not isinstance(page_count, int):
continue
actual = inv["page_files_by_doc"].get(doc_id, [])
actual_nums = sorted({n for n, _ in actual})
expected = list(range(1, page_count + 1))
missing = set(expected) - set(actual_nums)
extra = set(actual_nums) - set(expected)
if missing or extra:
errors.append(f"doc {doc_id}: page sequence broken (missing={sorted(missing)}, extra={sorted(extra)})")
return errors
def validate_canonical_uniqueness(inv: dict) -> list[str]:
"""Two distinct files cannot share canonical_name without disambiguation_note."""
errors = []
for name, paths in inv["canonical_name_index"].items():
if len(paths) <= 1:
continue
# Allow duplicates if ALL files declare disambiguation_note
all_have_note = all(inv["by_path"][str(p)]["fm"].get("disambiguation_note") for p in paths)
if not all_have_note:
rels = [str(p.relative_to(UFO_ROOT)) for p in paths]
errors.append(f"duplicate-canonical-name {name!r}: in {rels}")
return errors
def validate_links(inv: dict) -> tuple[list[str], list[str]]:
"""Check that every wiki-link resolves."""
broken = []
warned = []
for source_path_str, links in inv["links_out"].items():
for ln in links:
target_path = ln["resolved"]
if target_path is None:
broken.append(f"unparseable-link in {Path(source_path_str).relative_to(UFO_ROOT)}: [[{ln['target']}]]")
continue
if not target_path.exists():
broken.append(f"broken-link in {Path(source_path_str).relative_to(UFO_ROOT)}: [[{ln['target']}]] → {target_path.relative_to(UFO_ROOT)}")
return broken, warned
def detect_orphans(inv: dict) -> list[str]:
"""Entity files with zero inbound links (not referenced anywhere)."""
orphans = []
for path_str, info in inv["by_path"].items():
fm = info["fm"]
if fm.get("type") != "entity":
continue
path = Path(path_str)
if not inv["links_in"].get(str(path)):
rel = path.relative_to(UFO_ROOT)
orphans.append(f"orphan: {rel}")
return orphans
# ----------------------------------------------------------------------
# Backlink rebuild
# ----------------------------------------------------------------------
def _canonicalize_name(name: str) -> str:
"""Same algorithm used by script 03 (kebab-case ASCII-fold)."""
import unicodedata as ud
nfkd = ud.normalize("NFKD", name or "")
ascii_str = "".join(c for c in nfkd if not ud.combining(c))
lower = ascii_str.lower()
replaced = re.sub(r"[^a-z0-9-]", "-", lower)
collapsed = re.sub(r"-+", "-", replaced).strip("-")
if collapsed and collapsed[0].isdigit():
collapsed = "x-" + collapsed
return collapsed
PAGE_CLASS_TO_ENTITY_CLASS = {
"people": "person",
"organizations": "organization",
"locations": "location",
"vehicles": "vehicle",
"operations": "operation",
"concepts": "concept",
}
def _build_alias_index(inv: dict) -> dict[tuple[str, str], Path]:
"""Build {(entity_class, alias_key): entity_path} where alias_key is the
canonicalized form of every name/alias/canonical_name/concept_id under that
entity. Used to resolve free-text entity names extracted by Haiku back to
the curated entity file (which may have a friendlier canonical_id).
"""
index: dict[tuple[str, str], Path] = {}
for path_str, info in inv["by_path"].items():
fm = info["fm"]
if fm.get("type") != "entity":
continue
ec = fm.get("entity_class")
if not ec:
continue
keys: set[str] = set()
# canonical name + aliases + canonical_id itself
cname = fm.get("canonical_name")
if cname:
keys.add(_canonicalize_name(cname))
for alias in (fm.get("aliases") or []):
if isinstance(alias, str):
keys.add(_canonicalize_name(alias))
id_field = ENTITY_CLASS_ID.get(ec)
if id_field and id_field in fm:
keys.add(_canonicalize_name(fm[id_field]))
# Also include filename stem
keys.add(_canonicalize_name(Path(path_str).stem))
for key in keys:
if key:
index[(ec, key)] = Path(path_str)
return index
def rebuild_backlinks(inv: dict, dry_run: bool) -> tuple[int, int]:
"""For each entity file, materialize mentioned_in[] from page entities_extracted.
Resolution of "free-text entity name from Haiku""curated entity file" uses
the alias index (canonical_name + aliases + canonical_id all match).
Returns (entities_updated, entities_unchanged).
"""
updated = unchanged = 0
alias_index = _build_alias_index(inv)
# entity_file_path → list[(page_id, doc_id, role)]
mentions_by_entity: dict[str, list[tuple[str, str, str]]] = defaultdict(list)
for path_str, info in inv["by_path"].items():
fm = info["fm"]
if fm.get("type") != "page":
continue
page_id = fm.get("page_id", "")
doc_id = fm.get("doc_id", "")
ents = fm.get("entities_extracted") or {}
for cls, entries in ents.items():
if cls not in PAGE_CLASS_TO_ENTITY_CLASS:
continue
ec = PAGE_CLASS_TO_ENTITY_CLASS[cls]
for entry in (entries or []):
if not isinstance(entry, dict):
continue
name = entry.get("name")
if not name:
continue
key = _canonicalize_name(name)
target = alias_index.get((ec, key))
if not target:
continue
role = entry.get("role_in_page", "mentioned") if cls == "people" else "mentioned"
mentions_by_entity[str(target)].append((page_id, doc_id, role))
# Walk all entities and write mentioned_in[]
for path_str, info in inv["by_path"].items():
fm = info["fm"]
if fm.get("type") != "entity":
continue
ec = fm.get("entity_class")
if ec not in PAGE_CLASS_TO_ENTITY_CLASS.values():
# event, uap_object: their links come via documented_in/observed_in_event, not page entities_extracted
continue
mentions_raw = mentions_by_entity.get(path_str, [])
per_page: dict[str, dict] = {}
for page_id, doc_id, role in mentions_raw:
if page_id not in per_page:
per_page[page_id] = {"page": f"[[{page_id}]]", "mention_count": 0, "role_in_page": role}
per_page[page_id]["mention_count"] += 1
mentioned_in = sorted(per_page.values(), key=lambda x: -x["mention_count"])
total = sum(x["mention_count"] for x in mentioned_in)
unique_docs = {pg.split("/", 1)[0] for pg in per_page.keys()}
new_fm = dict(fm)
new_fm["mentioned_in"] = mentioned_in
new_fm["total_mentions"] = total
new_fm["documents_count"] = len(unique_docs)
# Idempotency: only bump last_lint if the substantive data changed
prev_lint = fm.get("last_lint")
snapshot_prev = {k: v for k, v in fm.items() if k != "last_lint"}
snapshot_new = {k: v for k, v in new_fm.items() if k != "last_lint"}
if snapshot_prev == snapshot_new:
unchanged += 1
continue
new_fm["last_lint"] = utc_now_iso()
if dry_run:
updated += 1
else:
changed = write_md(Path(path_str), new_fm, info["body"])
if changed:
updated += 1
else:
unchanged += 1
return updated, unchanged
# ----------------------------------------------------------------------
# Main
# ----------------------------------------------------------------------
def main():
ap = argparse.ArgumentParser(description="Lint wiki/case + rebuild backlinks.")
ap.add_argument("--scope", choices=["wiki", "case", "all"], default="all", help="scope to scan")
ap.add_argument("--fix", action="store_true", help="actually rewrite backlinks (default = report only)")
ap.add_argument("--strict", action="store_true", help="exit non-zero on any error")
args = ap.parse_args()
print(f"Scanning scope={args.scope}...", flush=True)
inv = collect_inventory(args.scope)
print(f" files: {len(inv['files'])}", flush=True)
all_errors: list[str] = []
all_warnings: list[str] = []
# 1. Required fields
field_errors = []
for path_str, info in inv["by_path"].items():
for err in validate_required_fields(info["fm"], Path(path_str)):
field_errors.append(f"{Path(path_str).relative_to(UFO_ROOT)}: {err}")
all_errors.extend(field_errors)
# 2. Page sequence
page_errors = validate_page_sequences(inv)
all_errors.extend(page_errors)
# 3. Canonical uniqueness
name_errors = validate_canonical_uniqueness(inv)
all_errors.extend(name_errors)
# 4. Links
broken, link_warnings = validate_links(inv)
all_errors.extend(broken)
all_warnings.extend(link_warnings)
# 5. Orphans (warning, not error)
orphans = detect_orphans(inv)
all_warnings.extend(orphans)
# 6. Rebuild backlinks
updated, unchanged = rebuild_backlinks(inv, dry_run=not args.fix)
# Report
print("\n=== LINT REPORT ===")
print(f" files scanned: {len(inv['files'])}")
print(f" errors: {len(all_errors)}")
for e in all_errors[:50]:
print(f"{e}")
if len(all_errors) > 50:
print(f" … and {len(all_errors) - 50} more")
print(f" warnings: {len(all_warnings)}")
for w in all_warnings[:20]:
print(f"{w}")
if len(all_warnings) > 20:
print(f" … and {len(all_warnings) - 20} more")
action = "would-update" if not args.fix else "updated"
print(f" backlinks: {action}={updated}, unchanged={unchanged}")
# Log entry
if args.fix:
with open(LOG_PATH, "a", encoding="utf-8") as fh:
fh.write(f"\n## {utc_now_iso()} — LINT (Phase 8)\n")
fh.write(f"- operator: archivist\n- scope: {args.scope}\n- files_scanned: {len(inv['files'])}\n")
fh.write(f"- errors: {len(all_errors)}\n- warnings: {len(all_warnings)}\n")
fh.write(f"- backlinks_updated: {updated}\n- backlinks_unchanged: {unchanged}\n")
if all_errors:
fh.write("- top_errors:\n")
for e in all_errors[:10]:
fh.write(f" - {e}\n")
if args.strict and all_errors:
sys.exit(1)
if __name__ == "__main__":
main()