394 lines
16 KiB
Python
Executable file
394 lines
16 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
14-build-document-md.py — Build wiki/documents/<doc-id>.md from its pages
|
|
|
|
Walks each document directory under wiki/pages/<doc-id>/ and aggregates its
|
|
page-level analyses into ONE consolidated document.md with:
|
|
|
|
- Frontmatter (doc_id, page_count, sha256, content_classification UNION,
|
|
highest_classification, languages_detected, redaction_codes_present,
|
|
war_gov block if available, executive_summary_confidence, key_entities aggregated)
|
|
- Page index (table linking to each [[<doc-id>/pNNN]])
|
|
- Aggregated entities (union of all entities_extracted across pages, deduped)
|
|
- Aggregated UAP observations (concat of all uap_observation_fields)
|
|
- Aggregated classification markings + redactions stats
|
|
- Optional Haiku-generated executive summary (bilingual EN + PT-BR)
|
|
|
|
A document is "ready" when its pages count == total_pages (from page frontmatter).
|
|
Idempotent: re-running updates last_lint timestamp only if substantive data changed.
|
|
|
|
Usage:
|
|
./14-build-document-md.py # all ready docs
|
|
./14-build-document-md.py --doc-id <id> # single
|
|
./14-build-document-md.py --force # rebuild even if exists
|
|
./14-build-document-md.py --with-llm-summary # also call Haiku for executive_summary
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
import unicodedata
|
|
from collections import Counter, defaultdict
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
try:
|
|
import yaml
|
|
except ImportError:
|
|
sys.stderr.write("Missing pyyaml. pip3 install pyyaml\n")
|
|
sys.exit(1)
|
|
|
|
|
|
UFO_ROOT = Path("/Users/guto/ufo")
|
|
PAGES_BASE = UFO_ROOT / "wiki" / "pages"
|
|
DOCS_BASE = UFO_ROOT / "wiki" / "documents"
|
|
PNG_BASE = UFO_ROOT / "processing" / "png"
|
|
RAW_DIR = UFO_ROOT / "raw"
|
|
LOG_PATH = UFO_ROOT / "wiki" / "log.md"
|
|
|
|
SCHEMA_VERSION = "0.1.0"
|
|
WIKI_VERSION = "0.1.0"
|
|
|
|
|
|
def utc_now_iso() -> str:
|
|
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
|
|
|
|
def read_md(path: Path) -> tuple[dict, str]:
|
|
c = path.read_text(encoding="utf-8")
|
|
if not c.startswith("---"):
|
|
return {}, c
|
|
end = c.find("---", 4)
|
|
if end == -1:
|
|
return {}, c
|
|
try:
|
|
return (yaml.safe_load(c[3:end].strip()) or {}), c[end + 3 :].lstrip("\n")
|
|
except yaml.YAMLError:
|
|
return {}, c[end + 3 :].lstrip("\n")
|
|
|
|
|
|
def write_md(path: Path, fm: dict, body: str) -> bool:
|
|
yaml_str = yaml.dump(fm, allow_unicode=True, sort_keys=False, default_flow_style=False)
|
|
new = f"---\n{yaml_str}---\n\n{body}" if not body.startswith("\n") else f"---\n{yaml_str}---\n{body}"
|
|
if path.exists():
|
|
existing = path.read_text(encoding="utf-8")
|
|
# Idempotency: compare excluding generated_at timestamps
|
|
if existing == new:
|
|
return False
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
path.write_text(new, encoding="utf-8")
|
|
return True
|
|
|
|
|
|
def sha256_file(p: Path) -> str:
|
|
h = hashlib.sha256()
|
|
with open(p, "rb") as fh:
|
|
for chunk in iter(lambda: fh.read(65536), b""):
|
|
h.update(chunk)
|
|
return h.hexdigest()
|
|
|
|
|
|
def find_pdf_for_doc(doc_id: str) -> Path | None:
|
|
"""Reverse-lookup: find the original PDF in raw/ matching this doc_id."""
|
|
def _canon(name: str) -> str:
|
|
base = name.rsplit(".", 1)[0]
|
|
nfkd = unicodedata.normalize("NFKD", base)
|
|
ascii_s = "".join(c for c in nfkd if not unicodedata.combining(c))
|
|
lower = ascii_s.lower()
|
|
replaced = re.sub(r"[^a-z0-9-]", "-", lower)
|
|
collapsed = re.sub(r"-+", "-", replaced).strip("-")
|
|
if collapsed and collapsed[0].isdigit():
|
|
collapsed = "doc-" + collapsed
|
|
return collapsed
|
|
for p in RAW_DIR.glob("*.pdf"):
|
|
if _canon(p.name) == doc_id:
|
|
return p
|
|
return None
|
|
|
|
|
|
def list_doc_dirs() -> list[Path]:
|
|
"""All wiki/pages/<doc-id>/ subdirectories."""
|
|
if not PAGES_BASE.exists():
|
|
return []
|
|
return sorted([d for d in PAGES_BASE.iterdir() if d.is_dir()])
|
|
|
|
|
|
MAX_MISSING_PAGES_TOLERATED = 3 # p000 OCR-missing cases are structural; tolerate small gaps
|
|
|
|
|
|
def is_doc_complete(doc_dir: Path) -> tuple[bool, int, int]:
|
|
"""Returns (is_complete, pages_done, total_pages_expected).
|
|
|
|
A doc is considered complete if it has at most MAX_MISSING_PAGES_TOLERATED
|
|
pages missing (typically the p000 with missing OCR — structurally irrecoverable).
|
|
"""
|
|
pages = sorted(doc_dir.glob("p*.md"))
|
|
if not pages:
|
|
return False, 0, 0
|
|
fm0, _ = read_md(pages[0])
|
|
total = fm0.get("total_pages") or 0
|
|
if total <= 0:
|
|
return False, len(pages), total
|
|
return len(pages) >= max(1, total - MAX_MISSING_PAGES_TOLERATED), len(pages), total
|
|
|
|
|
|
def aggregate(doc_id: str, doc_dir: Path) -> dict:
|
|
"""Walk all pages and aggregate frontmatter fields."""
|
|
pages = sorted(doc_dir.glob("p*.md"))
|
|
agg = {
|
|
"doc_id": doc_id,
|
|
"page_count": len(pages),
|
|
"pages": [],
|
|
"content_classification": set(),
|
|
"languages_detected": set(),
|
|
"redaction_codes": Counter(),
|
|
"classification_levels": Counter(),
|
|
"page_types": Counter(),
|
|
"entities": defaultdict(lambda: Counter()), # entities['people']['name'] = count
|
|
"uap_observations": [],
|
|
"first_vision_run_at": None,
|
|
"last_vision_run_at": None,
|
|
"total_redactions": 0,
|
|
"total_signatures": 0,
|
|
"total_tables": 0,
|
|
"total_images_detected": 0,
|
|
"ocr_quality_avg": 0.0,
|
|
"vision_quality_avg": 0.0,
|
|
"flags": Counter(),
|
|
}
|
|
ocr_scores = []
|
|
vis_scores = []
|
|
for p in pages:
|
|
fm, _ = read_md(p)
|
|
if not fm:
|
|
continue
|
|
m = re.match(r"p(\d+)", p.stem)
|
|
page_num = int(m.group(1)) if m else 0
|
|
agg["pages"].append({
|
|
"page": page_num,
|
|
"page_id": f"[[{fm.get('page_id','')}]]",
|
|
"page_type": fm.get("page_type", "unknown"),
|
|
"content_classification": fm.get("content_classification", []),
|
|
"language_detected": fm.get("language_detected", "unknown"),
|
|
})
|
|
|
|
for c in (fm.get("content_classification") or []):
|
|
agg["content_classification"].add(c)
|
|
lang = fm.get("language_detected")
|
|
if lang and lang != "unknown":
|
|
agg["languages_detected"].add(lang)
|
|
agg["page_types"][fm.get("page_type", "unknown")] += 1
|
|
for r in (fm.get("redactions") or []):
|
|
agg["total_redactions"] += 1
|
|
code = r.get("code")
|
|
if code:
|
|
agg["redaction_codes"][code] += 1
|
|
for cm in (fm.get("classification_markings") or []):
|
|
lv = cm.get("level")
|
|
if lv:
|
|
agg["classification_levels"][lv] += 1
|
|
agg["total_signatures"] += len(fm.get("signatures_observed") or [])
|
|
agg["total_tables"] += len(fm.get("tables_detected") or [])
|
|
agg["total_images_detected"] += len(fm.get("images_detected") or [])
|
|
for ent_class, items in (fm.get("entities_extracted") or {}).items():
|
|
for it in (items or []):
|
|
name = (it.get("name") if isinstance(it, dict) else None) \
|
|
or (it.get("label") if isinstance(it, dict) else None) \
|
|
or (it.get("shape") if isinstance(it, dict) else None)
|
|
if name:
|
|
agg["entities"][ent_class][name] += 1
|
|
if fm.get("uap_observation_fields"):
|
|
uap = dict(fm["uap_observation_fields"])
|
|
uap["_page"] = page_num
|
|
agg["uap_observations"].append(uap)
|
|
ocr = fm.get("ocr_quality_score")
|
|
if isinstance(ocr, (int, float)):
|
|
ocr_scores.append(ocr)
|
|
vis = fm.get("vision_quality_score")
|
|
if isinstance(vis, (int, float)):
|
|
vis_scores.append(vis)
|
|
for fg in (fm.get("flags") or []):
|
|
agg["flags"][fg] += 1
|
|
run_at = fm.get("vision_run_at")
|
|
if run_at:
|
|
if not agg["first_vision_run_at"] or run_at < agg["first_vision_run_at"]:
|
|
agg["first_vision_run_at"] = run_at
|
|
if not agg["last_vision_run_at"] or run_at > agg["last_vision_run_at"]:
|
|
agg["last_vision_run_at"] = run_at
|
|
|
|
agg["ocr_quality_avg"] = round(sum(ocr_scores) / len(ocr_scores), 3) if ocr_scores else 0.0
|
|
agg["vision_quality_avg"] = round(sum(vis_scores) / len(vis_scores), 3) if vis_scores else 0.0
|
|
return agg
|
|
|
|
|
|
def highest_classification(level_counter: Counter) -> str:
|
|
order = ["TOP SECRET", "SECRET", "CONFIDENTIAL", "CUI", "UNCLASSIFIED"]
|
|
for lv in order:
|
|
if level_counter.get(lv, 0) > 0:
|
|
return lv
|
|
return "UNCLASSIFIED"
|
|
|
|
|
|
def render_document_md(doc_id: str, agg: dict, pdf_path: Path | None) -> tuple[dict, str]:
|
|
"""Compose document.md frontmatter + body from aggregated data."""
|
|
top_people = [n for n, _ in agg["entities"].get("people", Counter()).most_common(20)]
|
|
top_orgs = [n for n, _ in agg["entities"].get("organizations", Counter()).most_common(20)]
|
|
top_locs = [n for n, _ in agg["entities"].get("locations", Counter()).most_common(20)]
|
|
top_events = [n for n, _ in agg["entities"].get("events", Counter()).most_common(10)]
|
|
top_uap = [n for n, _ in agg["entities"].get("uap_objects", Counter()).most_common(10)]
|
|
top_vehicles = [n for n, _ in agg["entities"].get("vehicles", Counter()).most_common(10)]
|
|
top_concepts = [n for n, _ in agg["entities"].get("concepts", Counter()).most_common(20)]
|
|
|
|
fm = {
|
|
"schema_version": SCHEMA_VERSION,
|
|
"type": "document",
|
|
"doc_id": doc_id,
|
|
"canonical_title": doc_id.replace("-", " ").title(),
|
|
"original_filename": pdf_path.name if pdf_path else None,
|
|
"raw_path": f"../raw/{pdf_path.name}" if pdf_path else None,
|
|
"sha256": sha256_file(pdf_path) if pdf_path and pdf_path.exists() else None,
|
|
"size_bytes": pdf_path.stat().st_size if pdf_path and pdf_path.exists() else None,
|
|
"page_count": agg["page_count"],
|
|
"mime_type": "application/pdf",
|
|
"collection": "DOW-UAP", # TODO: infer from doc_id prefix
|
|
"document_class": "unknown",
|
|
"content_classification": sorted(agg["content_classification"]),
|
|
"highest_classification": highest_classification(agg["classification_levels"]),
|
|
"languages_detected": sorted(agg["languages_detected"]),
|
|
"has_redactions": agg["total_redactions"] > 0,
|
|
"redaction_codes_present": sorted(agg["redaction_codes"].keys()),
|
|
"redactions_total": agg["total_redactions"],
|
|
"signatures_total": agg["total_signatures"],
|
|
"tables_total": agg["total_tables"],
|
|
"images_detected_total": agg["total_images_detected"],
|
|
"page_types_histogram": dict(agg["page_types"]),
|
|
"ocr_quality_avg": agg["ocr_quality_avg"],
|
|
"vision_quality_avg": agg["vision_quality_avg"],
|
|
"flags": dict(agg["flags"]),
|
|
"first_vision_run_at": agg["first_vision_run_at"],
|
|
"last_vision_run_at": agg["last_vision_run_at"],
|
|
"ingest_date": agg["last_vision_run_at"][:10] if agg["last_vision_run_at"] else None,
|
|
"last_ingest": utc_now_iso(),
|
|
"wiki_version": WIKI_VERSION,
|
|
"key_entities": {
|
|
"people": top_people,
|
|
"organizations": top_orgs,
|
|
"locations": top_locs,
|
|
"events": top_events,
|
|
"uap_objects": top_uap,
|
|
"vehicles": top_vehicles,
|
|
"concepts": top_concepts,
|
|
},
|
|
"uap_observations_count": len(agg["uap_observations"]),
|
|
"pages": [{"page": p["page"], "page_id": p["page_id"], "page_type": p["page_type"]} for p in agg["pages"]],
|
|
}
|
|
|
|
# Body
|
|
body = f"# {fm['canonical_title']}\n\n"
|
|
body += f"> **{agg['page_count']}-page document** · {fm['highest_classification']} · {len(agg['content_classification'])} content categories · {agg['total_redactions']} redactions across pages\n\n"
|
|
|
|
body += "## Quick stats\n\n"
|
|
body += f"- **Pages**: {agg['page_count']}\n"
|
|
body += f"- **Languages**: {', '.join(sorted(agg['languages_detected'])) or 'n/a'}\n"
|
|
body += f"- **Page types**: {dict(agg['page_types'])}\n"
|
|
body += f"- **Redaction codes**: {dict(agg['redaction_codes'])}\n"
|
|
body += f"- **Classification levels seen**: {dict(agg['classification_levels'])}\n"
|
|
body += f"- **Signatures observed**: {agg['total_signatures']}\n"
|
|
body += f"- **Tables detected**: {agg['total_tables']}\n"
|
|
body += f"- **Images detected**: {agg['total_images_detected']}\n"
|
|
body += f"- **OCR quality (avg)**: {agg['ocr_quality_avg']}\n"
|
|
body += f"- **Vision quality (avg)**: {agg['vision_quality_avg']}\n\n"
|
|
|
|
body += "## Key entities (aggregated across all pages)\n\n"
|
|
for label, lst in [
|
|
("People", top_people), ("Organizations", top_orgs), ("Locations", top_locs),
|
|
("Events", top_events), ("UAP objects", top_uap), ("Vehicles", top_vehicles),
|
|
("Concepts", top_concepts),
|
|
]:
|
|
if lst:
|
|
body += f"### {label}\n\n"
|
|
for item in lst:
|
|
body += f"- {item}\n"
|
|
body += "\n"
|
|
|
|
if agg["uap_observations"]:
|
|
body += "## UAP observations across pages\n\n"
|
|
for u in agg["uap_observations"]:
|
|
p = u.get("_page", "?")
|
|
shape = u.get("shape") or "unknown"
|
|
color = u.get("color") or ""
|
|
alt = u.get("altitude_ft")
|
|
spd = u.get("speed_kts")
|
|
body += f"- **Page {p}**: shape=`{shape}` color=`{color}` altitude={alt} speed={spd}\n"
|
|
body += "\n"
|
|
|
|
body += "## Page index\n\n"
|
|
body += "| Page | Type | Classification |\n|---|---|---|\n"
|
|
for p in agg["pages"]:
|
|
cc = ", ".join(p["content_classification"]) or "—"
|
|
body += f"| {p['page_id']} | `{p['page_type']}` | {cc} |\n"
|
|
body += "\n"
|
|
|
|
body += "## Notes\n\n"
|
|
body += "Document.md is a **consolidated view** of all pages. For per-page detail (OCR text, vision description bilingual, entities, etc.), open the individual `wiki/pages/<doc-id>/p<NNN>.md` files linked in the page index above.\n"
|
|
|
|
return fm, body
|
|
|
|
|
|
def process_doc(doc_dir: Path, force: bool) -> bool:
|
|
doc_id = doc_dir.name
|
|
complete, done, total = is_doc_complete(doc_dir)
|
|
if not complete:
|
|
print(f" ⏳ {doc_id}: {done}/{total} pages — not ready, skipping")
|
|
return False
|
|
|
|
out_path = DOCS_BASE / f"{doc_id}.md"
|
|
if out_path.exists() and not force:
|
|
# Check if mtime newer than last page mtime
|
|
latest_page_mtime = max((p.stat().st_mtime for p in doc_dir.glob("p*.md")), default=0)
|
|
if out_path.stat().st_mtime >= latest_page_mtime:
|
|
return False # already up-to-date
|
|
|
|
print(f" 📄 {doc_id}: {done}/{total} pages — building document.md")
|
|
agg = aggregate(doc_id, doc_dir)
|
|
pdf = find_pdf_for_doc(doc_id)
|
|
fm, body = render_document_md(doc_id, agg, pdf)
|
|
changed = write_md(out_path, fm, body)
|
|
if changed:
|
|
print(f" ✓ {out_path.relative_to(UFO_ROOT)}")
|
|
return changed
|
|
|
|
|
|
def main():
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("--doc-id", help="single doc")
|
|
ap.add_argument("--force", action="store_true")
|
|
args = ap.parse_args()
|
|
|
|
DOCS_BASE.mkdir(parents=True, exist_ok=True)
|
|
targets = [PAGES_BASE / args.doc_id] if args.doc_id else list_doc_dirs()
|
|
print(f"Processing {len(targets)} doc(s)…")
|
|
built = 0
|
|
for d in targets:
|
|
if not d.exists():
|
|
sys.stderr.write(f" ✗ no pages dir for {d.name}\n")
|
|
continue
|
|
if process_doc(d, args.force):
|
|
built += 1
|
|
print(f"\nBuilt/updated: {built} document.md")
|
|
if built > 0:
|
|
with open(LOG_PATH, "a", encoding="utf-8") as fh:
|
|
fh.write(
|
|
f"\n## {utc_now_iso()} — BUILD DOCUMENT.MD (Phase 4)\n"
|
|
f"- operator: archivist + case-writer\n- script: scripts/14-build-document-md.py\n"
|
|
f"- documents_built: {built}\n"
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|