disclosure-bureau/scripts/00b-coverage-report.py

250 lines
9.6 KiB
Python
Executable file

#!/usr/bin/env python3
"""
00b-coverage-report.py — Compare /Users/guto/ufo/raw/ against the war.gov
metadata JSON extracted by 00-extract-war-gov.js. Reports:
- documents in war.gov that ARE present in raw/
- documents in war.gov that are MISSING from raw/ (need to be downloaded)
- files in raw/ that DO NOT appear in war.gov (manual additions / older releases / renamed)
Matching is lenient: both sides normalize to ASCII-folded lowercase kebab-case,
with extra noise stripped. We try filename match first, then title match.
Usage:
./00b-coverage-report.py # uses release-01 by default
./00b-coverage-report.py --json <path> # custom JSON path
./00b-coverage-report.py --json <path> --out <path> # custom output report path
"""
from __future__ import annotations
import argparse
import json
import re
import sys
import unicodedata
from pathlib import Path
UFO_ROOT = Path("/Users/guto/ufo")
RAW = UFO_ROOT / "raw"
DEFAULT_JSON = UFO_ROOT / "processing" / "war-gov-metadata" / "all-documents-release-01-basic.json"
DEFAULT_OUT = UFO_ROOT / "processing" / "war-gov-metadata" / "coverage-report.md"
def normalize(s: str) -> str:
"""ASCII fold + lowercase + collapse non-alnum to hyphens + strip zero-padding."""
if not s:
return ""
nfkd = unicodedata.normalize("NFKD", s)
ascii_s = "".join(c for c in nfkd if not unicodedata.combining(c))
lower = ascii_s.lower()
lower = lower.replace("'", "").replace(",", "-").replace("[", "").replace("]", "")
replaced = re.sub(r"[^a-z0-9]+", "-", lower)
norm = re.sub(r"-+", "-", replaced).strip("-")
# Strip zero-padding inside "letter+digits" tokens: d074 → d74, b001 → b1, section-001 → section-1
# Apply repeatedly because regex doesn't recurse.
prev = None
while prev != norm:
prev = norm
norm = re.sub(r"(?<=[a-z-])0+(\d)", r"\1", norm)
return norm
def main():
ap = argparse.ArgumentParser(description="Compare raw/ vs war.gov metadata JSON.")
ap.add_argument("--json", default=str(DEFAULT_JSON), help="path to war-gov metadata JSON")
ap.add_argument("--out", default=str(DEFAULT_OUT), help="output report path (markdown)")
args = ap.parse_args()
json_path = Path(args.json)
if not json_path.exists():
sys.stderr.write(f"JSON not found: {json_path}\n")
sys.exit(1)
data = json.loads(json_path.read_text(encoding="utf-8"))
war_docs = data.get("documents", [])
print(f"war.gov JSON: {json_path.name}{len(war_docs)} docs")
# Build raw inventory by normalized basename (no extension)
raw_files = sorted(p for p in RAW.iterdir() if p.is_file() and not p.name.startswith("."))
raw_norm_to_path: dict[str, Path] = {}
for p in raw_files:
stem = p.stem
raw_norm_to_path[normalize(stem)] = p
print(f"raw/: {len(raw_files)} files")
print()
# Common noise tokens that hurt Jaccard accuracy
COMMON = {
"mission", "report", "uap", "the", "of", "and", "a", "in", "on", "for",
"with", "to", "from", "department", "war", "fbi", "nasa", "state",
"unresolved", "debrief", "summary", "transcript", "crew", "general",
"vol", "incident", "summaries", "photo", "video", "cable", "email",
"correspondence", "correspondance", "launch", "range", "fouler",
"force", "air", "navy", "between", "or", "year", "month",
"january", "february", "march", "april", "may", "june", "july",
"august", "september", "october", "november", "december", "redacted",
"sub", "sighting", "about", "kuwait", "kazakhstan", "papua", "guinea",
"syria", "iraq", "iran", "yemen", "djibouti", "japan", "greece",
"mexico", "germany", "turkey", "turkmenistan", "georgia", "tbilisi",
"indopacom", "middle", "east", "africa", "europe", "western", "united",
"states", "north", "south", "america",
}
def signature_tokens(s: str) -> set[str]:
return {t for t in normalize(s).split("-") if t and t not in COMMON}
def jaccard(a: set, b: set) -> float:
if not a or not b:
return 0.0
return len(a & b) / len(a | b)
def primary_id(s: str) -> str | None:
"""Extract a stable prefix identifier from titles/filenames.
Examples:
'DOW-UAP-D074, MISSION REPORT, ...''dow-uap-d74'
'DOW-UAP-D57-Mission-Report-Gulf-of-Aden-September-2020''dow-uap-d57'
'NASA-UAP-D003, GEMINI 7 TRANSCRIPT, 1965''nasa-uap-d3'
'FBI PHOTO B001''fbi-photo-b1'
Returns None if no ID prefix found.
"""
n = normalize(s)
patterns = [
r"^(dow-uap-[a-z]{1,4}\d+)",
r"^(dos-uap-d\d+)",
r"^(nasa-uap-[a-z]{1,3}\d+[a-z]?)",
r"^(fbi-photo-[a-z]\d+)",
]
for p in patterns:
m = re.match(p, n)
if m:
return m.group(1)
return None
raw_tokens_index = [(p, signature_tokens(p.stem), normalize(p.stem), primary_id(p.stem)) for p in raw_files]
present: list[tuple[dict, Path, str, float]] = []
missing: list[dict] = []
matched_raw_paths: set[Path] = set()
for doc in war_docs:
title = doc.get("title", "")
norm_title = normalize(title)
sig_war = signature_tokens(title)
asset = doc.get("asset_file_name") or ""
match = None
reason = ""
score = 1.0
war_pid = primary_id(title)
# Tier 1: direct normalized match
for p, _sig, raw_norm, _pid in raw_tokens_index:
if raw_norm == norm_title or (asset and raw_norm == normalize(asset)):
match = p; reason = "exact-norm"; break
# Tier 2: primary-id match (DOW-UAP-D74, etc.) — strongest semantic anchor
if not match and war_pid:
for p, _sig, _raw_norm, raw_pid in raw_tokens_index:
if p in matched_raw_paths:
continue
if raw_pid and raw_pid == war_pid:
match = p; reason = f"primary-id={war_pid}"; break
# Tier 3: containment (one inside the other) — high specificity
if not match:
for p, _sig, raw_norm, _pid in raw_tokens_index:
if p in matched_raw_paths:
continue
if len(norm_title) >= 12 and len(raw_norm) >= 12 and (
norm_title in raw_norm or raw_norm in norm_title
):
match = p; reason = "containment"; break
# Tier 4: signature-token Jaccard with threshold
if not match and sig_war:
best = None
best_score = 0.0
for p, sig_raw, _raw_norm, _pid in raw_tokens_index:
if p in matched_raw_paths:
continue
j = jaccard(sig_war, sig_raw)
if j > best_score:
best_score = j
best = p
if best is not None and best_score >= 0.50:
match = best; reason = f"jaccard={best_score:.2f}"; score = best_score
if match:
present.append((doc, match, reason, score))
matched_raw_paths.add(match)
else:
missing.append(doc)
# raw files NOT mentioned in war.gov
orphan_raw = [p for p in raw_files if p not in matched_raw_paths]
# Summary
print(f"{'='*60}")
print(f"Present in raw/: {len(present)} / {len(war_docs)}")
print(f"Missing from raw/: {len(missing)}")
print(f"Orphan files in raw/ (not in war.gov metadata): {len(orphan_raw)}")
print(f"{'='*60}")
# Build report
lines: list[str] = []
lines.append("# Coverage Report — war.gov/UFO vs /Users/guto/ufo/raw/")
lines.append("")
lines.append(f"- Source JSON: `{json_path}`")
lines.append(f"- raw/ inventory: {len(raw_files)} files")
lines.append(f"- war.gov inventory: {len(war_docs)} documents")
lines.append(f"- **Present**: {len(present)}")
lines.append(f"- **Missing**: {len(missing)} (need to be downloaded)")
lines.append(f"- **Orphan in raw/**: {len(orphan_raw)} (not in war.gov metadata)")
lines.append("")
lines.append("## Missing from raw/ (must be downloaded)")
lines.append("")
if missing:
lines.append("| record_id | title | agency | document_type | pdf_url_inferred |")
lines.append("|---|---|---|---|---|")
for d in missing:
url = d.get("pdf_url_inferred") or d.get("pdf_url") or ""
lines.append(
f"| {d.get('record_id','')} "
f"| {d.get('title','')} "
f"| {d.get('agency','')} "
f"| {d.get('document_type','')} "
f"| {url} |"
)
else:
lines.append("_(none)_")
lines.append("")
lines.append("## Present in raw/ (no action needed)")
lines.append("")
if present:
lines.append("| record_id | title | matched raw/ file | match reason |")
lines.append("|---|---|---|---|")
for d, p, reason, _score in present:
lines.append(f"| {d.get('record_id','')} | {d.get('title','')} | `{p.name}` | {reason} |")
else:
lines.append("_(none)_")
lines.append("")
lines.append("## Orphan files in raw/ (likely older releases or manual additions)")
lines.append("")
if orphan_raw:
for p in orphan_raw:
lines.append(f"- `{p.name}`")
else:
lines.append("_(none)_")
lines.append("")
out_path = Path(args.out)
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_text("\n".join(lines), encoding="utf-8")
print(f"\nReport written: {out_path}")
if __name__ == "__main__":
main()