disclosure-bureau/scripts/06-graph-export.py

422 lines
14 KiB
Python
Executable file

#!/usr/bin/env python3
"""
06-graph-export.py — Export a graph JSON of the wiki for client-side viz
Walks wiki/ and case/, builds:
nodes:
- one per document
- one per entity (person, organization, location, event, uap_object,
vehicle, operation, concept)
- one per gap, evidence, hypothesis, witness, profile (case artifacts)
edges:
- document → page (contains)
- page → entity (mentions, via entities_extracted)
- entity → entity (related_*, observed_in_event, primary_location, etc.)
- relation node nodes[] → its members (connect-the-dots)
- gap.detected_in[] → page/document
Output:
wiki/graph.json
The JSON shape is friendly to Cytoscape / Sigma.js / react-flow. Each node
carries `type`, `label`, `entity_class` (when applicable), and `data` with the
frontmatter fields useful for filters (collection, country, date, confidence,
etc.).
Usage:
./06-graph-export.py
./06-graph-export.py --out /path/to/graph.json
"""
from __future__ import annotations
import argparse
import json
import re
import sys
from collections import defaultdict
from datetime import datetime, timezone
from pathlib import Path
try:
import yaml
except ImportError:
sys.stderr.write("Missing pyyaml. Run: pip3 install pyyaml\n")
sys.exit(1)
UFO_ROOT = Path("/Users/guto/ufo")
WIKI_BASE = UFO_ROOT / "wiki"
CASE_BASE = UFO_ROOT / "case"
DEFAULT_OUT = WIKI_BASE / "graph.json"
LOG_PATH = WIKI_BASE / "log.md"
WIKI_LINK_RE = re.compile(r"\[\[([^\]|]+)(?:\|[^\]]+)?\]\]")
def utc_now_iso() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def read_md(path: Path) -> tuple[dict, str]:
c = path.read_text(encoding="utf-8")
if not c.startswith("---"):
return {}, c
end = c.find("---", 4)
if end == -1:
return {}, c
try:
return (yaml.safe_load(c[3:end].strip()) or {}), c[end + 3 :]
except yaml.YAMLError:
return {}, c[end + 3 :]
def parse_wiki_link(target: str) -> tuple[str, str] | None:
"""Extract (namespace, id) from a wiki-link target string.
Returns None for unrecognized targets.
"""
t = target.strip()
# Page link: doc-id/pNNN
m = re.match(r"^([a-z0-9][a-z0-9-]*)/p\d{3}$", t)
if m:
return ("page", t)
if "/" in t:
ns, rest = t.split("/", 1)
return (ns, rest)
# Bare doc_id
return ("document", t)
def node_id_from_link(target: str) -> str | None:
"""Compute the canonical node id used in the graph from a wiki-link target."""
parsed = parse_wiki_link(target)
if not parsed:
return None
ns, rest = parsed
return f"{ns}:{rest}"
def make_node(node_id: str, ntype: str, label: str, **extra) -> dict:
n = {"id": node_id, "type": ntype, "label": label}
if extra:
n["data"] = extra
return n
def make_edge(source: str, target: str, kind: str, weight: float = 1.0) -> dict:
return {"source": source, "target": target, "kind": kind, "weight": weight}
# ----------------------------------------------------------------------
def collect_documents(graph: dict):
docs_dir = WIKI_BASE / "documents"
for p in sorted(docs_dir.glob("*.md")):
fm, _ = read_md(p)
if fm.get("type") != "document":
continue
doc_id = fm.get("doc_id", p.stem)
node_id = f"document:{doc_id}"
graph["nodes"][node_id] = make_node(
node_id, "document",
fm.get("canonical_title", doc_id),
collection=fm.get("collection"),
document_class=fm.get("document_class"),
page_count=fm.get("page_count"),
content_classification=fm.get("content_classification"),
document_date=fm.get("document_date"),
highest_classification=fm.get("highest_classification"),
)
# document → page edges
for page_ref in (fm.get("pages") or []):
page_id_link = page_ref.get("page_id") if isinstance(page_ref, dict) else None
if not page_id_link:
continue
# extract the [[doc/pNNN]] target
m = WIKI_LINK_RE.search(page_id_link)
if not m:
continue
target = node_id_from_link(m.group(1))
if target:
graph["edges"].append(make_edge(node_id, target, "contains"))
# document → key entities
key_entities = fm.get("key_entities") or {}
for cls, refs in key_entities.items():
for ref in (refs or []):
if not isinstance(ref, str):
continue
m = WIKI_LINK_RE.search(ref)
if m:
tgt = node_id_from_link(m.group(1))
if tgt:
graph["edges"].append(make_edge(node_id, tgt, "key-entity"))
# gaps_flagged
for ref in (fm.get("gaps_flagged") or []):
if isinstance(ref, str):
m = WIKI_LINK_RE.search(ref)
if m:
tgt = node_id_from_link(m.group(1))
if tgt:
graph["edges"].append(make_edge(node_id, tgt, "flags-gap"))
def collect_pages(graph: dict):
for p in sorted((WIKI_BASE / "pages").rglob("*.md")):
fm, _ = read_md(p)
if fm.get("type") != "page":
continue
page_id = fm.get("page_id")
if not page_id:
continue
node_id = f"page:{page_id}"
graph["nodes"][node_id] = make_node(
node_id, "page",
f"{fm.get('doc_id', '?')} p{fm.get('page_number', '?'):>03}",
page_number=fm.get("page_number"),
page_type=fm.get("page_type"),
content_classification=fm.get("content_classification"),
language_detected=fm.get("language_detected"),
)
# page → entities (mentions, via entities_extracted)
page_entity_map = {
"people": "people",
"organizations": "org",
"locations": "loc",
"vehicles": "vehicle",
"operations": "op",
"concepts": "concept",
}
ents = fm.get("entities_extracted") or {}
# We can't easily resolve the canonicalized id here without doing the
# alias-match lookup. The lint script's `mentioned_in[]` is the
# source-of-truth for who-mentions-who, so we'll add edges from
# entity → page later, not page → entity here.
def collect_entities(graph: dict):
entities_root = WIKI_BASE / "entities"
for p in sorted(entities_root.rglob("*.md")):
fm, _ = read_md(p)
if fm.get("type") != "entity":
continue
ec = fm.get("entity_class")
id_field_map = {
"person": "person_id",
"organization": "organization_id",
"location": "location_id",
"event": "event_id",
"uap_object": "uap_object_id",
"vehicle": "vehicle_id",
"operation": "operation_id",
"concept": "concept_id",
}
ns_map = {
"person": "people",
"organization": "org",
"location": "loc",
"event": "event",
"uap_object": "uap",
"vehicle": "vehicle",
"operation": "op",
"concept": "concept",
}
id_field = id_field_map.get(ec)
ns = ns_map.get(ec)
if not id_field or not ns:
continue
eid = fm.get(id_field) or p.stem
node_id = f"{ns}:{eid}"
graph["nodes"][node_id] = make_node(
node_id, ns,
fm.get("canonical_name", eid),
entity_class=ec,
aliases=fm.get("aliases"),
country=fm.get("country"),
location_type=fm.get("location_type"),
organization_type=fm.get("organization_type"),
shape=fm.get("shape"),
color=fm.get("color"),
event_class=fm.get("event_class"),
date_start=fm.get("date_start"),
concept_class=fm.get("concept_class"),
total_mentions=fm.get("total_mentions"),
enrichment_status=fm.get("enrichment_status"),
)
# entity → page edges (via mentioned_in[])
for m in (fm.get("mentioned_in") or []):
if not isinstance(m, dict):
continue
link = m.get("page")
if not link:
continue
mm = WIKI_LINK_RE.search(link)
if mm:
tgt = node_id_from_link(mm.group(1))
if tgt:
graph["edges"].append(make_edge(
node_id, tgt, "mentioned-in",
weight=m.get("mention_count", 1),
))
# event-specific links
if ec == "event":
pl = fm.get("primary_location")
if isinstance(pl, str):
mm = WIKI_LINK_RE.search(pl)
if mm:
tgt = node_id_from_link(mm.group(1))
if tgt:
graph["edges"].append(make_edge(node_id, tgt, "occurred-at"))
for obj in (fm.get("uap_objects") or []):
if isinstance(obj, str):
mm = WIKI_LINK_RE.search(obj)
if mm:
tgt = node_id_from_link(mm.group(1))
if tgt:
graph["edges"].append(make_edge(node_id, tgt, "observed-uap"))
# uap_object → event
if ec == "uap_object":
ev = fm.get("observed_in_event")
if isinstance(ev, str):
mm = WIKI_LINK_RE.search(ev)
if mm:
tgt = node_id_from_link(mm.group(1))
if tgt:
graph["edges"].append(make_edge(node_id, tgt, "observed-in-event"))
def collect_case_artifacts(graph: dict):
"""Add gaps, evidence, witnesses, hypotheses, profiles, relations."""
type_to_ns = {
"gap": "gap",
"evidence": "evidence",
"witness_analysis": "witness",
"hypothesis": "hypothesis",
"actor_profile": "profile",
"relation": "relation",
"case_report": "case",
"residual_uncertainty": "case",
"timeline": "timeline",
}
for p in sorted(CASE_BASE.rglob("*.md")):
fm, _ = read_md(p)
t = fm.get("type")
if t not in type_to_ns:
continue
ns = type_to_ns[t]
# ID detection
id_field = {
"gap": "gap_id",
"evidence": "evidence_id",
"witness_analysis": "witness_id",
"hypothesis": "hypothesis_id",
"actor_profile": "actor_profile_id",
"relation": "relation_id",
"case_report": "case_id",
"timeline": "scope_id",
}.get(t)
eid = (fm.get(id_field) if id_field else None) or p.stem
node_id = f"{ns}:{eid}"
graph["nodes"][node_id] = make_node(
node_id, ns,
fm.get("canonical_title", eid),
t=t,
severity=fm.get("severity"),
evidence_grade=fm.get("evidence_grade"),
status=fm.get("status"),
verdict=fm.get("verdict"),
connection_strength=fm.get("connection_strength"),
)
# gap → detected_in (pages)
if t == "gap":
for ref in (fm.get("detected_in") or []):
if isinstance(ref, str):
mm = WIKI_LINK_RE.search(ref)
if mm:
tgt = node_id_from_link(mm.group(1))
if tgt:
graph["edges"].append(make_edge(node_id, tgt, "detected-in"))
# relation → nodes[]
if t == "relation":
for ref in (fm.get("nodes") or []):
if isinstance(ref, str):
mm = WIKI_LINK_RE.search(ref)
if mm:
tgt = node_id_from_link(mm.group(1))
if tgt:
graph["edges"].append(make_edge(node_id, tgt, "relates"))
# ----------------------------------------------------------------------
def main():
ap = argparse.ArgumentParser(description="Export wiki graph (nodes + edges) as JSON.")
ap.add_argument("--out", default=str(DEFAULT_OUT), help=f"output path (default: {DEFAULT_OUT})")
args = ap.parse_args()
graph: dict = {
"generated_at": utc_now_iso(),
"wiki_version": "0.1.0",
"nodes": {}, # dict keyed by node_id for dedup, flattened to list at end
"edges": [],
}
collect_documents(graph)
collect_pages(graph)
collect_entities(graph)
collect_case_artifacts(graph)
# Dedup edges
edge_seen = set()
deduped_edges = []
for e in graph["edges"]:
key = (e["source"], e["target"], e["kind"])
if key in edge_seen:
continue
edge_seen.add(key)
deduped_edges.append(e)
graph["edges"] = deduped_edges
# Filter out edges pointing to nodes we don't have (broken refs)
valid_ids = set(graph["nodes"].keys())
pre = len(graph["edges"])
graph["edges"] = [e for e in graph["edges"] if e["source"] in valid_ids and e["target"] in valid_ids]
dropped = pre - len(graph["edges"])
nodes_list = sorted(graph["nodes"].values(), key=lambda n: (n["type"], n["id"]))
graph["nodes"] = nodes_list
# Summary by type
by_type: dict[str, int] = defaultdict(int)
for n in nodes_list:
by_type[n["type"]] += 1
edges_by_kind: dict[str, int] = defaultdict(int)
for e in graph["edges"]:
edges_by_kind[e["kind"]] += 1
out_path = Path(args.out)
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_text(json.dumps(graph, indent=2, ensure_ascii=False), encoding="utf-8")
print(f"Graph written to {out_path}", flush=True)
print(f" nodes total: {len(nodes_list)}", flush=True)
for t, n in sorted(by_type.items()):
print(f" {t}: {n}", flush=True)
print(f" edges total: {len(graph['edges'])} (dropped {dropped} dangling)", flush=True)
for k, n in sorted(edges_by_kind.items()):
print(f" {k}: {n}", flush=True)
with open(LOG_PATH, "a", encoding="utf-8") as fh:
fh.write(f"\n## {utc_now_iso()} — GRAPH EXPORT\n")
fh.write(f"- operator: archivist\n- script: scripts/06-graph-export.py\n")
fh.write(f"- output: {out_path.relative_to(UFO_ROOT)}\n")
fh.write(f"- nodes: {len(nodes_list)}\n- edges: {len(graph['edges'])}\n")
if __name__ == "__main__":
main()