422 lines
14 KiB
Python
Executable file
422 lines
14 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
06-graph-export.py — Export a graph JSON of the wiki for client-side viz
|
|
|
|
Walks wiki/ and case/, builds:
|
|
nodes:
|
|
- one per document
|
|
- one per entity (person, organization, location, event, uap_object,
|
|
vehicle, operation, concept)
|
|
- one per gap, evidence, hypothesis, witness, profile (case artifacts)
|
|
edges:
|
|
- document → page (contains)
|
|
- page → entity (mentions, via entities_extracted)
|
|
- entity → entity (related_*, observed_in_event, primary_location, etc.)
|
|
- relation node nodes[] → its members (connect-the-dots)
|
|
- gap.detected_in[] → page/document
|
|
|
|
Output:
|
|
wiki/graph.json
|
|
|
|
The JSON shape is friendly to Cytoscape / Sigma.js / react-flow. Each node
|
|
carries `type`, `label`, `entity_class` (when applicable), and `data` with the
|
|
frontmatter fields useful for filters (collection, country, date, confidence,
|
|
etc.).
|
|
|
|
Usage:
|
|
./06-graph-export.py
|
|
./06-graph-export.py --out /path/to/graph.json
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import re
|
|
import sys
|
|
from collections import defaultdict
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
try:
|
|
import yaml
|
|
except ImportError:
|
|
sys.stderr.write("Missing pyyaml. Run: pip3 install pyyaml\n")
|
|
sys.exit(1)
|
|
|
|
|
|
UFO_ROOT = Path("/Users/guto/ufo")
|
|
WIKI_BASE = UFO_ROOT / "wiki"
|
|
CASE_BASE = UFO_ROOT / "case"
|
|
DEFAULT_OUT = WIKI_BASE / "graph.json"
|
|
LOG_PATH = WIKI_BASE / "log.md"
|
|
|
|
WIKI_LINK_RE = re.compile(r"\[\[([^\]|]+)(?:\|[^\]]+)?\]\]")
|
|
|
|
|
|
def utc_now_iso() -> str:
|
|
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
|
|
|
|
def read_md(path: Path) -> tuple[dict, str]:
|
|
c = path.read_text(encoding="utf-8")
|
|
if not c.startswith("---"):
|
|
return {}, c
|
|
end = c.find("---", 4)
|
|
if end == -1:
|
|
return {}, c
|
|
try:
|
|
return (yaml.safe_load(c[3:end].strip()) or {}), c[end + 3 :]
|
|
except yaml.YAMLError:
|
|
return {}, c[end + 3 :]
|
|
|
|
|
|
def parse_wiki_link(target: str) -> tuple[str, str] | None:
|
|
"""Extract (namespace, id) from a wiki-link target string.
|
|
Returns None for unrecognized targets.
|
|
"""
|
|
t = target.strip()
|
|
# Page link: doc-id/pNNN
|
|
m = re.match(r"^([a-z0-9][a-z0-9-]*)/p\d{3}$", t)
|
|
if m:
|
|
return ("page", t)
|
|
if "/" in t:
|
|
ns, rest = t.split("/", 1)
|
|
return (ns, rest)
|
|
# Bare doc_id
|
|
return ("document", t)
|
|
|
|
|
|
def node_id_from_link(target: str) -> str | None:
|
|
"""Compute the canonical node id used in the graph from a wiki-link target."""
|
|
parsed = parse_wiki_link(target)
|
|
if not parsed:
|
|
return None
|
|
ns, rest = parsed
|
|
return f"{ns}:{rest}"
|
|
|
|
|
|
def make_node(node_id: str, ntype: str, label: str, **extra) -> dict:
|
|
n = {"id": node_id, "type": ntype, "label": label}
|
|
if extra:
|
|
n["data"] = extra
|
|
return n
|
|
|
|
|
|
def make_edge(source: str, target: str, kind: str, weight: float = 1.0) -> dict:
|
|
return {"source": source, "target": target, "kind": kind, "weight": weight}
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
|
|
def collect_documents(graph: dict):
|
|
docs_dir = WIKI_BASE / "documents"
|
|
for p in sorted(docs_dir.glob("*.md")):
|
|
fm, _ = read_md(p)
|
|
if fm.get("type") != "document":
|
|
continue
|
|
doc_id = fm.get("doc_id", p.stem)
|
|
node_id = f"document:{doc_id}"
|
|
graph["nodes"][node_id] = make_node(
|
|
node_id, "document",
|
|
fm.get("canonical_title", doc_id),
|
|
collection=fm.get("collection"),
|
|
document_class=fm.get("document_class"),
|
|
page_count=fm.get("page_count"),
|
|
content_classification=fm.get("content_classification"),
|
|
document_date=fm.get("document_date"),
|
|
highest_classification=fm.get("highest_classification"),
|
|
)
|
|
# document → page edges
|
|
for page_ref in (fm.get("pages") or []):
|
|
page_id_link = page_ref.get("page_id") if isinstance(page_ref, dict) else None
|
|
if not page_id_link:
|
|
continue
|
|
# extract the [[doc/pNNN]] target
|
|
m = WIKI_LINK_RE.search(page_id_link)
|
|
if not m:
|
|
continue
|
|
target = node_id_from_link(m.group(1))
|
|
if target:
|
|
graph["edges"].append(make_edge(node_id, target, "contains"))
|
|
|
|
# document → key entities
|
|
key_entities = fm.get("key_entities") or {}
|
|
for cls, refs in key_entities.items():
|
|
for ref in (refs or []):
|
|
if not isinstance(ref, str):
|
|
continue
|
|
m = WIKI_LINK_RE.search(ref)
|
|
if m:
|
|
tgt = node_id_from_link(m.group(1))
|
|
if tgt:
|
|
graph["edges"].append(make_edge(node_id, tgt, "key-entity"))
|
|
|
|
# gaps_flagged
|
|
for ref in (fm.get("gaps_flagged") or []):
|
|
if isinstance(ref, str):
|
|
m = WIKI_LINK_RE.search(ref)
|
|
if m:
|
|
tgt = node_id_from_link(m.group(1))
|
|
if tgt:
|
|
graph["edges"].append(make_edge(node_id, tgt, "flags-gap"))
|
|
|
|
|
|
def collect_pages(graph: dict):
|
|
for p in sorted((WIKI_BASE / "pages").rglob("*.md")):
|
|
fm, _ = read_md(p)
|
|
if fm.get("type") != "page":
|
|
continue
|
|
page_id = fm.get("page_id")
|
|
if not page_id:
|
|
continue
|
|
node_id = f"page:{page_id}"
|
|
graph["nodes"][node_id] = make_node(
|
|
node_id, "page",
|
|
f"{fm.get('doc_id', '?')} p{fm.get('page_number', '?'):>03}",
|
|
page_number=fm.get("page_number"),
|
|
page_type=fm.get("page_type"),
|
|
content_classification=fm.get("content_classification"),
|
|
language_detected=fm.get("language_detected"),
|
|
)
|
|
# page → entities (mentions, via entities_extracted)
|
|
page_entity_map = {
|
|
"people": "people",
|
|
"organizations": "org",
|
|
"locations": "loc",
|
|
"vehicles": "vehicle",
|
|
"operations": "op",
|
|
"concepts": "concept",
|
|
}
|
|
ents = fm.get("entities_extracted") or {}
|
|
# We can't easily resolve the canonicalized id here without doing the
|
|
# alias-match lookup. The lint script's `mentioned_in[]` is the
|
|
# source-of-truth for who-mentions-who, so we'll add edges from
|
|
# entity → page later, not page → entity here.
|
|
|
|
|
|
def collect_entities(graph: dict):
|
|
entities_root = WIKI_BASE / "entities"
|
|
for p in sorted(entities_root.rglob("*.md")):
|
|
fm, _ = read_md(p)
|
|
if fm.get("type") != "entity":
|
|
continue
|
|
ec = fm.get("entity_class")
|
|
id_field_map = {
|
|
"person": "person_id",
|
|
"organization": "organization_id",
|
|
"location": "location_id",
|
|
"event": "event_id",
|
|
"uap_object": "uap_object_id",
|
|
"vehicle": "vehicle_id",
|
|
"operation": "operation_id",
|
|
"concept": "concept_id",
|
|
}
|
|
ns_map = {
|
|
"person": "people",
|
|
"organization": "org",
|
|
"location": "loc",
|
|
"event": "event",
|
|
"uap_object": "uap",
|
|
"vehicle": "vehicle",
|
|
"operation": "op",
|
|
"concept": "concept",
|
|
}
|
|
id_field = id_field_map.get(ec)
|
|
ns = ns_map.get(ec)
|
|
if not id_field or not ns:
|
|
continue
|
|
eid = fm.get(id_field) or p.stem
|
|
node_id = f"{ns}:{eid}"
|
|
graph["nodes"][node_id] = make_node(
|
|
node_id, ns,
|
|
fm.get("canonical_name", eid),
|
|
entity_class=ec,
|
|
aliases=fm.get("aliases"),
|
|
country=fm.get("country"),
|
|
location_type=fm.get("location_type"),
|
|
organization_type=fm.get("organization_type"),
|
|
shape=fm.get("shape"),
|
|
color=fm.get("color"),
|
|
event_class=fm.get("event_class"),
|
|
date_start=fm.get("date_start"),
|
|
concept_class=fm.get("concept_class"),
|
|
total_mentions=fm.get("total_mentions"),
|
|
enrichment_status=fm.get("enrichment_status"),
|
|
)
|
|
# entity → page edges (via mentioned_in[])
|
|
for m in (fm.get("mentioned_in") or []):
|
|
if not isinstance(m, dict):
|
|
continue
|
|
link = m.get("page")
|
|
if not link:
|
|
continue
|
|
mm = WIKI_LINK_RE.search(link)
|
|
if mm:
|
|
tgt = node_id_from_link(mm.group(1))
|
|
if tgt:
|
|
graph["edges"].append(make_edge(
|
|
node_id, tgt, "mentioned-in",
|
|
weight=m.get("mention_count", 1),
|
|
))
|
|
|
|
# event-specific links
|
|
if ec == "event":
|
|
pl = fm.get("primary_location")
|
|
if isinstance(pl, str):
|
|
mm = WIKI_LINK_RE.search(pl)
|
|
if mm:
|
|
tgt = node_id_from_link(mm.group(1))
|
|
if tgt:
|
|
graph["edges"].append(make_edge(node_id, tgt, "occurred-at"))
|
|
for obj in (fm.get("uap_objects") or []):
|
|
if isinstance(obj, str):
|
|
mm = WIKI_LINK_RE.search(obj)
|
|
if mm:
|
|
tgt = node_id_from_link(mm.group(1))
|
|
if tgt:
|
|
graph["edges"].append(make_edge(node_id, tgt, "observed-uap"))
|
|
|
|
# uap_object → event
|
|
if ec == "uap_object":
|
|
ev = fm.get("observed_in_event")
|
|
if isinstance(ev, str):
|
|
mm = WIKI_LINK_RE.search(ev)
|
|
if mm:
|
|
tgt = node_id_from_link(mm.group(1))
|
|
if tgt:
|
|
graph["edges"].append(make_edge(node_id, tgt, "observed-in-event"))
|
|
|
|
|
|
def collect_case_artifacts(graph: dict):
|
|
"""Add gaps, evidence, witnesses, hypotheses, profiles, relations."""
|
|
type_to_ns = {
|
|
"gap": "gap",
|
|
"evidence": "evidence",
|
|
"witness_analysis": "witness",
|
|
"hypothesis": "hypothesis",
|
|
"actor_profile": "profile",
|
|
"relation": "relation",
|
|
"case_report": "case",
|
|
"residual_uncertainty": "case",
|
|
"timeline": "timeline",
|
|
}
|
|
|
|
for p in sorted(CASE_BASE.rglob("*.md")):
|
|
fm, _ = read_md(p)
|
|
t = fm.get("type")
|
|
if t not in type_to_ns:
|
|
continue
|
|
ns = type_to_ns[t]
|
|
# ID detection
|
|
id_field = {
|
|
"gap": "gap_id",
|
|
"evidence": "evidence_id",
|
|
"witness_analysis": "witness_id",
|
|
"hypothesis": "hypothesis_id",
|
|
"actor_profile": "actor_profile_id",
|
|
"relation": "relation_id",
|
|
"case_report": "case_id",
|
|
"timeline": "scope_id",
|
|
}.get(t)
|
|
eid = (fm.get(id_field) if id_field else None) or p.stem
|
|
node_id = f"{ns}:{eid}"
|
|
graph["nodes"][node_id] = make_node(
|
|
node_id, ns,
|
|
fm.get("canonical_title", eid),
|
|
t=t,
|
|
severity=fm.get("severity"),
|
|
evidence_grade=fm.get("evidence_grade"),
|
|
status=fm.get("status"),
|
|
verdict=fm.get("verdict"),
|
|
connection_strength=fm.get("connection_strength"),
|
|
)
|
|
|
|
# gap → detected_in (pages)
|
|
if t == "gap":
|
|
for ref in (fm.get("detected_in") or []):
|
|
if isinstance(ref, str):
|
|
mm = WIKI_LINK_RE.search(ref)
|
|
if mm:
|
|
tgt = node_id_from_link(mm.group(1))
|
|
if tgt:
|
|
graph["edges"].append(make_edge(node_id, tgt, "detected-in"))
|
|
|
|
# relation → nodes[]
|
|
if t == "relation":
|
|
for ref in (fm.get("nodes") or []):
|
|
if isinstance(ref, str):
|
|
mm = WIKI_LINK_RE.search(ref)
|
|
if mm:
|
|
tgt = node_id_from_link(mm.group(1))
|
|
if tgt:
|
|
graph["edges"].append(make_edge(node_id, tgt, "relates"))
|
|
|
|
|
|
# ----------------------------------------------------------------------
|
|
|
|
def main():
|
|
ap = argparse.ArgumentParser(description="Export wiki graph (nodes + edges) as JSON.")
|
|
ap.add_argument("--out", default=str(DEFAULT_OUT), help=f"output path (default: {DEFAULT_OUT})")
|
|
args = ap.parse_args()
|
|
|
|
graph: dict = {
|
|
"generated_at": utc_now_iso(),
|
|
"wiki_version": "0.1.0",
|
|
"nodes": {}, # dict keyed by node_id for dedup, flattened to list at end
|
|
"edges": [],
|
|
}
|
|
|
|
collect_documents(graph)
|
|
collect_pages(graph)
|
|
collect_entities(graph)
|
|
collect_case_artifacts(graph)
|
|
|
|
# Dedup edges
|
|
edge_seen = set()
|
|
deduped_edges = []
|
|
for e in graph["edges"]:
|
|
key = (e["source"], e["target"], e["kind"])
|
|
if key in edge_seen:
|
|
continue
|
|
edge_seen.add(key)
|
|
deduped_edges.append(e)
|
|
graph["edges"] = deduped_edges
|
|
|
|
# Filter out edges pointing to nodes we don't have (broken refs)
|
|
valid_ids = set(graph["nodes"].keys())
|
|
pre = len(graph["edges"])
|
|
graph["edges"] = [e for e in graph["edges"] if e["source"] in valid_ids and e["target"] in valid_ids]
|
|
dropped = pre - len(graph["edges"])
|
|
|
|
nodes_list = sorted(graph["nodes"].values(), key=lambda n: (n["type"], n["id"]))
|
|
graph["nodes"] = nodes_list
|
|
|
|
# Summary by type
|
|
by_type: dict[str, int] = defaultdict(int)
|
|
for n in nodes_list:
|
|
by_type[n["type"]] += 1
|
|
edges_by_kind: dict[str, int] = defaultdict(int)
|
|
for e in graph["edges"]:
|
|
edges_by_kind[e["kind"]] += 1
|
|
|
|
out_path = Path(args.out)
|
|
out_path.parent.mkdir(parents=True, exist_ok=True)
|
|
out_path.write_text(json.dumps(graph, indent=2, ensure_ascii=False), encoding="utf-8")
|
|
|
|
print(f"Graph written to {out_path}", flush=True)
|
|
print(f" nodes total: {len(nodes_list)}", flush=True)
|
|
for t, n in sorted(by_type.items()):
|
|
print(f" {t}: {n}", flush=True)
|
|
print(f" edges total: {len(graph['edges'])} (dropped {dropped} dangling)", flush=True)
|
|
for k, n in sorted(edges_by_kind.items()):
|
|
print(f" {k}: {n}", flush=True)
|
|
|
|
with open(LOG_PATH, "a", encoding="utf-8") as fh:
|
|
fh.write(f"\n## {utc_now_iso()} — GRAPH EXPORT\n")
|
|
fh.write(f"- operator: archivist\n- script: scripts/06-graph-export.py\n")
|
|
fh.write(f"- output: {out_path.relative_to(UFO_ROOT)}\n")
|
|
fh.write(f"- nodes: {len(nodes_list)}\n- edges: {len(graph['edges'])}\n")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|