250 lines
9.6 KiB
Python
Executable file
250 lines
9.6 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
00b-coverage-report.py — Compare /Users/guto/ufo/raw/ against the war.gov
|
|
metadata JSON extracted by 00-extract-war-gov.js. Reports:
|
|
|
|
- documents in war.gov that ARE present in raw/
|
|
- documents in war.gov that are MISSING from raw/ (need to be downloaded)
|
|
- files in raw/ that DO NOT appear in war.gov (manual additions / older releases / renamed)
|
|
|
|
Matching is lenient: both sides normalize to ASCII-folded lowercase kebab-case,
|
|
with extra noise stripped. We try filename match first, then title match.
|
|
|
|
Usage:
|
|
./00b-coverage-report.py # uses release-01 by default
|
|
./00b-coverage-report.py --json <path> # custom JSON path
|
|
./00b-coverage-report.py --json <path> --out <path> # custom output report path
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import re
|
|
import sys
|
|
import unicodedata
|
|
from pathlib import Path
|
|
|
|
UFO_ROOT = Path("/Users/guto/ufo")
|
|
RAW = UFO_ROOT / "raw"
|
|
DEFAULT_JSON = UFO_ROOT / "processing" / "war-gov-metadata" / "all-documents-release-01-basic.json"
|
|
DEFAULT_OUT = UFO_ROOT / "processing" / "war-gov-metadata" / "coverage-report.md"
|
|
|
|
|
|
def normalize(s: str) -> str:
|
|
"""ASCII fold + lowercase + collapse non-alnum to hyphens + strip zero-padding."""
|
|
if not s:
|
|
return ""
|
|
nfkd = unicodedata.normalize("NFKD", s)
|
|
ascii_s = "".join(c for c in nfkd if not unicodedata.combining(c))
|
|
lower = ascii_s.lower()
|
|
lower = lower.replace("'", "").replace(",", "-").replace("[", "").replace("]", "")
|
|
replaced = re.sub(r"[^a-z0-9]+", "-", lower)
|
|
norm = re.sub(r"-+", "-", replaced).strip("-")
|
|
# Strip zero-padding inside "letter+digits" tokens: d074 → d74, b001 → b1, section-001 → section-1
|
|
# Apply repeatedly because regex doesn't recurse.
|
|
prev = None
|
|
while prev != norm:
|
|
prev = norm
|
|
norm = re.sub(r"(?<=[a-z-])0+(\d)", r"\1", norm)
|
|
return norm
|
|
|
|
|
|
def main():
|
|
ap = argparse.ArgumentParser(description="Compare raw/ vs war.gov metadata JSON.")
|
|
ap.add_argument("--json", default=str(DEFAULT_JSON), help="path to war-gov metadata JSON")
|
|
ap.add_argument("--out", default=str(DEFAULT_OUT), help="output report path (markdown)")
|
|
args = ap.parse_args()
|
|
|
|
json_path = Path(args.json)
|
|
if not json_path.exists():
|
|
sys.stderr.write(f"JSON not found: {json_path}\n")
|
|
sys.exit(1)
|
|
data = json.loads(json_path.read_text(encoding="utf-8"))
|
|
war_docs = data.get("documents", [])
|
|
print(f"war.gov JSON: {json_path.name} — {len(war_docs)} docs")
|
|
|
|
# Build raw inventory by normalized basename (no extension)
|
|
raw_files = sorted(p for p in RAW.iterdir() if p.is_file() and not p.name.startswith("."))
|
|
raw_norm_to_path: dict[str, Path] = {}
|
|
for p in raw_files:
|
|
stem = p.stem
|
|
raw_norm_to_path[normalize(stem)] = p
|
|
|
|
print(f"raw/: {len(raw_files)} files")
|
|
print()
|
|
|
|
# Common noise tokens that hurt Jaccard accuracy
|
|
COMMON = {
|
|
"mission", "report", "uap", "the", "of", "and", "a", "in", "on", "for",
|
|
"with", "to", "from", "department", "war", "fbi", "nasa", "state",
|
|
"unresolved", "debrief", "summary", "transcript", "crew", "general",
|
|
"vol", "incident", "summaries", "photo", "video", "cable", "email",
|
|
"correspondence", "correspondance", "launch", "range", "fouler",
|
|
"force", "air", "navy", "between", "or", "year", "month",
|
|
"january", "february", "march", "april", "may", "june", "july",
|
|
"august", "september", "october", "november", "december", "redacted",
|
|
"sub", "sighting", "about", "kuwait", "kazakhstan", "papua", "guinea",
|
|
"syria", "iraq", "iran", "yemen", "djibouti", "japan", "greece",
|
|
"mexico", "germany", "turkey", "turkmenistan", "georgia", "tbilisi",
|
|
"indopacom", "middle", "east", "africa", "europe", "western", "united",
|
|
"states", "north", "south", "america",
|
|
}
|
|
|
|
def signature_tokens(s: str) -> set[str]:
|
|
return {t for t in normalize(s).split("-") if t and t not in COMMON}
|
|
|
|
def jaccard(a: set, b: set) -> float:
|
|
if not a or not b:
|
|
return 0.0
|
|
return len(a & b) / len(a | b)
|
|
|
|
def primary_id(s: str) -> str | None:
|
|
"""Extract a stable prefix identifier from titles/filenames.
|
|
Examples:
|
|
'DOW-UAP-D074, MISSION REPORT, ...' → 'dow-uap-d74'
|
|
'DOW-UAP-D57-Mission-Report-Gulf-of-Aden-September-2020' → 'dow-uap-d57'
|
|
'NASA-UAP-D003, GEMINI 7 TRANSCRIPT, 1965' → 'nasa-uap-d3'
|
|
'FBI PHOTO B001' → 'fbi-photo-b1'
|
|
Returns None if no ID prefix found.
|
|
"""
|
|
n = normalize(s)
|
|
patterns = [
|
|
r"^(dow-uap-[a-z]{1,4}\d+)",
|
|
r"^(dos-uap-d\d+)",
|
|
r"^(nasa-uap-[a-z]{1,3}\d+[a-z]?)",
|
|
r"^(fbi-photo-[a-z]\d+)",
|
|
]
|
|
for p in patterns:
|
|
m = re.match(p, n)
|
|
if m:
|
|
return m.group(1)
|
|
return None
|
|
|
|
raw_tokens_index = [(p, signature_tokens(p.stem), normalize(p.stem), primary_id(p.stem)) for p in raw_files]
|
|
|
|
present: list[tuple[dict, Path, str, float]] = []
|
|
missing: list[dict] = []
|
|
matched_raw_paths: set[Path] = set()
|
|
|
|
for doc in war_docs:
|
|
title = doc.get("title", "")
|
|
norm_title = normalize(title)
|
|
sig_war = signature_tokens(title)
|
|
asset = doc.get("asset_file_name") or ""
|
|
|
|
match = None
|
|
reason = ""
|
|
score = 1.0
|
|
war_pid = primary_id(title)
|
|
|
|
# Tier 1: direct normalized match
|
|
for p, _sig, raw_norm, _pid in raw_tokens_index:
|
|
if raw_norm == norm_title or (asset and raw_norm == normalize(asset)):
|
|
match = p; reason = "exact-norm"; break
|
|
|
|
# Tier 2: primary-id match (DOW-UAP-D74, etc.) — strongest semantic anchor
|
|
if not match and war_pid:
|
|
for p, _sig, _raw_norm, raw_pid in raw_tokens_index:
|
|
if p in matched_raw_paths:
|
|
continue
|
|
if raw_pid and raw_pid == war_pid:
|
|
match = p; reason = f"primary-id={war_pid}"; break
|
|
|
|
# Tier 3: containment (one inside the other) — high specificity
|
|
if not match:
|
|
for p, _sig, raw_norm, _pid in raw_tokens_index:
|
|
if p in matched_raw_paths:
|
|
continue
|
|
if len(norm_title) >= 12 and len(raw_norm) >= 12 and (
|
|
norm_title in raw_norm or raw_norm in norm_title
|
|
):
|
|
match = p; reason = "containment"; break
|
|
|
|
# Tier 4: signature-token Jaccard with threshold
|
|
if not match and sig_war:
|
|
best = None
|
|
best_score = 0.0
|
|
for p, sig_raw, _raw_norm, _pid in raw_tokens_index:
|
|
if p in matched_raw_paths:
|
|
continue
|
|
j = jaccard(sig_war, sig_raw)
|
|
if j > best_score:
|
|
best_score = j
|
|
best = p
|
|
if best is not None and best_score >= 0.50:
|
|
match = best; reason = f"jaccard={best_score:.2f}"; score = best_score
|
|
|
|
if match:
|
|
present.append((doc, match, reason, score))
|
|
matched_raw_paths.add(match)
|
|
else:
|
|
missing.append(doc)
|
|
|
|
# raw files NOT mentioned in war.gov
|
|
orphan_raw = [p for p in raw_files if p not in matched_raw_paths]
|
|
|
|
# Summary
|
|
print(f"{'='*60}")
|
|
print(f"Present in raw/: {len(present)} / {len(war_docs)}")
|
|
print(f"Missing from raw/: {len(missing)}")
|
|
print(f"Orphan files in raw/ (not in war.gov metadata): {len(orphan_raw)}")
|
|
print(f"{'='*60}")
|
|
|
|
# Build report
|
|
lines: list[str] = []
|
|
lines.append("# Coverage Report — war.gov/UFO vs /Users/guto/ufo/raw/")
|
|
lines.append("")
|
|
lines.append(f"- Source JSON: `{json_path}`")
|
|
lines.append(f"- raw/ inventory: {len(raw_files)} files")
|
|
lines.append(f"- war.gov inventory: {len(war_docs)} documents")
|
|
lines.append(f"- **Present**: {len(present)}")
|
|
lines.append(f"- **Missing**: {len(missing)} (need to be downloaded)")
|
|
lines.append(f"- **Orphan in raw/**: {len(orphan_raw)} (not in war.gov metadata)")
|
|
lines.append("")
|
|
|
|
lines.append("## Missing from raw/ (must be downloaded)")
|
|
lines.append("")
|
|
if missing:
|
|
lines.append("| record_id | title | agency | document_type | pdf_url_inferred |")
|
|
lines.append("|---|---|---|---|---|")
|
|
for d in missing:
|
|
url = d.get("pdf_url_inferred") or d.get("pdf_url") or ""
|
|
lines.append(
|
|
f"| {d.get('record_id','')} "
|
|
f"| {d.get('title','')} "
|
|
f"| {d.get('agency','')} "
|
|
f"| {d.get('document_type','')} "
|
|
f"| {url} |"
|
|
)
|
|
else:
|
|
lines.append("_(none)_")
|
|
lines.append("")
|
|
|
|
lines.append("## Present in raw/ (no action needed)")
|
|
lines.append("")
|
|
if present:
|
|
lines.append("| record_id | title | matched raw/ file | match reason |")
|
|
lines.append("|---|---|---|---|")
|
|
for d, p, reason, _score in present:
|
|
lines.append(f"| {d.get('record_id','')} | {d.get('title','')} | `{p.name}` | {reason} |")
|
|
else:
|
|
lines.append("_(none)_")
|
|
lines.append("")
|
|
|
|
lines.append("## Orphan files in raw/ (likely older releases or manual additions)")
|
|
lines.append("")
|
|
if orphan_raw:
|
|
for p in orphan_raw:
|
|
lines.append(f"- `{p.name}`")
|
|
else:
|
|
lines.append("_(none)_")
|
|
lines.append("")
|
|
|
|
out_path = Path(args.out)
|
|
out_path.parent.mkdir(parents=True, exist_ok=True)
|
|
out_path.write_text("\n".join(lines), encoding="utf-8")
|
|
print(f"\nReport written: {out_path}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|