282 lines
10 KiB
Python
Executable file
282 lines
10 KiB
Python
Executable file
#!/usr/bin/env python3
|
||
"""
|
||
23-smart-dedup.py — Phase 1 + Phase 3: aggressive entity cleanup.
|
||
|
||
Removes garbage entities that Haiku/extraction over-promoted:
|
||
A. Stop-list filter — single-mention bare common nouns
|
||
B. Substring/alias dedup — "FBI" vs "F-B-I" vs "Federal Bureau of Investigation"
|
||
C. Compound-name detection — entities A+B that co-occur ≥3 pages → suggest merge
|
||
D. Title-prefix recovery — "Chief Tereoken" appearing in raw page text but
|
||
dedup created only "tereoken" + "chief"
|
||
|
||
Runs in two modes:
|
||
--dry-run → report what would be deleted/merged, no writes
|
||
(default) → applies deletes and merges, removes orphans, updates affected page.md
|
||
files to substitute merged names
|
||
|
||
Skip --merge-compounds to disable (C) since it can be aggressive.
|
||
|
||
Usage:
|
||
./23-smart-dedup.py --dry-run
|
||
./23-smart-dedup.py --apply
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import re
|
||
import sys
|
||
import unicodedata
|
||
from collections import Counter, defaultdict
|
||
from pathlib import Path
|
||
|
||
try:
|
||
import yaml
|
||
except ImportError:
|
||
sys.stderr.write("pip3 install pyyaml\n"); sys.exit(1)
|
||
|
||
|
||
UFO_ROOT = Path("/Users/guto/ufo")
|
||
ENTITIES = UFO_ROOT / "wiki" / "entities"
|
||
PAGES = UFO_ROOT / "wiki" / "pages"
|
||
LOG = UFO_ROOT / "wiki" / "log.md"
|
||
|
||
|
||
# Common-noun stop list — these have no value as standalone entities.
|
||
# Drop only if mention_count == 1 (still keep if used many times — it's a real referent).
|
||
STOP_NOUNS = {
|
||
# Roles / positions
|
||
"agent", "agents", "officer", "officers", "chief", "captain", "general", "major",
|
||
"colonel", "sergeant", "commander", "director", "secretary", "lieutenant", "lcdr",
|
||
"cdr", "lt", "lt.", "cpl", "sgt", "supervisor", "inspector",
|
||
# Generic structures
|
||
"base", "office", "department", "bureau", "agency", "division", "section",
|
||
"headquarters", "command", "post", "station", "branch", "unit", "group",
|
||
"the", "the bureau", "the agency", "the department", "the office", "the file",
|
||
# File / document terms
|
||
"file", "files", "memo", "memorandum", "letter", "report", "form", "page", "pages",
|
||
"subject", "date", "time", "case", "exhibit", "attachment", "enclosure",
|
||
"signature", "stamp", "carbon copy", "cc", "ref", "reference", "annex",
|
||
"envelope", "bag", "folder", "transmittal", "routing", "dispatch",
|
||
# Generic descriptors
|
||
"the inspector", "the agent", "the officer", "the witness", "the observer",
|
||
"the subject", "the man", "the woman", "the pilot", "the operator",
|
||
# Things that often slip into entities by mistake
|
||
"technicians", "personnel", "staff", "team", "crew", "members",
|
||
"operations", "operation", # only when very generic — collisions handled by mention_count
|
||
"departments",
|
||
}
|
||
|
||
# Common single-letter or 2-letter "entities" that are useless on their own.
|
||
TRIVIAL_PATTERNS = [
|
||
re.compile(r"^[a-z0-9]$"),
|
||
re.compile(r"^[a-z]{1,2}$"), # tiny initials
|
||
]
|
||
|
||
|
||
def normalize(s: str) -> str:
|
||
nfd = unicodedata.normalize("NFD", s)
|
||
return "".join(c for c in nfd if not unicodedata.combining(c)).lower().strip()
|
||
|
||
|
||
def read_md(path: Path) -> tuple[dict, str]:
|
||
try:
|
||
c = path.read_text(encoding="utf-8")
|
||
except FileNotFoundError:
|
||
return {}, ""
|
||
if not c.startswith("---"):
|
||
return {}, c
|
||
end = c.find("---", 4)
|
||
if end < 0:
|
||
return {}, c
|
||
try:
|
||
fm = yaml.safe_load(c[3:end].strip()) or {}
|
||
except yaml.YAMLError:
|
||
fm = {}
|
||
body = c[end + 3:].lstrip("\n")
|
||
return fm, body
|
||
|
||
|
||
def is_trivial(canonical_name: str, entity_id: str) -> bool:
|
||
n = normalize(canonical_name)
|
||
if not n:
|
||
return True
|
||
if n in STOP_NOUNS:
|
||
return True
|
||
if any(p.match(n) for p in TRIVIAL_PATTERNS):
|
||
return True
|
||
# All-stop-words sequence: "the bureau", "the office"
|
||
words = n.split()
|
||
if len(words) >= 2 and all(w in STOP_NOUNS or w == "the" for w in words):
|
||
return True
|
||
# Single common noun
|
||
if len(words) == 1 and n in STOP_NOUNS:
|
||
return True
|
||
return False
|
||
|
||
|
||
def filter_low_signal(*, dry: bool) -> dict[str, list[Path]]:
|
||
"""A — Delete entities that are trivial AND have low mention_count."""
|
||
stats: dict[str, list[Path]] = {"deleted": [], "kept_high_mention": [], "kept": []}
|
||
for p in ENTITIES.glob("*/*.md"):
|
||
fm, _ = read_md(p)
|
||
if not fm:
|
||
continue
|
||
canonical = (fm.get("canonical_name") or p.stem)
|
||
total = int(fm.get("total_mentions") or 0)
|
||
if is_trivial(canonical, p.stem):
|
||
if total <= 2: # trivial + rarely mentioned = noise
|
||
stats["deleted"].append(p)
|
||
if not dry:
|
||
p.unlink()
|
||
else:
|
||
stats["kept_high_mention"].append(p)
|
||
else:
|
||
stats["kept"].append(p)
|
||
return stats
|
||
|
||
|
||
def aliases_of(fm: dict) -> set[str]:
|
||
out = set()
|
||
cname = fm.get("canonical_name")
|
||
if isinstance(cname, str):
|
||
out.add(normalize(cname))
|
||
for a in (fm.get("aliases") or []):
|
||
if isinstance(a, str):
|
||
out.add(normalize(a))
|
||
return {x for x in out if x}
|
||
|
||
|
||
def dedupe_by_alias(*, dry: bool) -> dict[str, int]:
|
||
"""B — Merge entities whose alias sets overlap.
|
||
Strategy: keep the entity with highest total_mentions; redirect others by
|
||
appending an alias and deleting their files.
|
||
"""
|
||
stats = {"merges": 0, "deletes": 0}
|
||
by_class: dict[str, dict[str, Path]] = defaultdict(dict)
|
||
|
||
# Build class → alias → path map (last-write-wins; collisions become merge targets)
|
||
overlap: dict[str, dict[str, list[Path]]] = defaultdict(lambda: defaultdict(list))
|
||
for p in ENTITIES.glob("*/*.md"):
|
||
cls = p.parent.name
|
||
fm, _ = read_md(p)
|
||
if not fm:
|
||
continue
|
||
for a in aliases_of(fm):
|
||
overlap[cls][a].append(p)
|
||
|
||
for cls, alias_map in overlap.items():
|
||
for alias, paths in alias_map.items():
|
||
if len(paths) < 2:
|
||
continue
|
||
# Pick canonical winner: highest total_mentions
|
||
ranked = []
|
||
for pp in paths:
|
||
if not pp.exists():
|
||
continue
|
||
fm, _ = read_md(pp)
|
||
if not fm:
|
||
continue
|
||
ranked.append((int(fm.get("total_mentions") or 0), pp, fm))
|
||
if len(ranked) < 2:
|
||
continue
|
||
ranked.sort(key=lambda x: x[0], reverse=True)
|
||
winner_count, winner_path, winner_fm = ranked[0]
|
||
|
||
for _count, loser_path, loser_fm in ranked[1:]:
|
||
if loser_path == winner_path or not loser_path.exists():
|
||
continue
|
||
# Add loser's aliases to winner
|
||
new_aliases = sorted(set((winner_fm.get("aliases") or []))
|
||
| set(loser_fm.get("aliases") or [])
|
||
| {loser_fm.get("canonical_name") or loser_path.stem})
|
||
winner_fm["aliases"] = [a for a in new_aliases if a]
|
||
# Total mentions sum
|
||
winner_fm["total_mentions"] = (winner_fm.get("total_mentions") or 0) + (loser_fm.get("total_mentions") or 0)
|
||
if not dry:
|
||
new_yaml = yaml.dump(winner_fm, allow_unicode=True, sort_keys=False, default_flow_style=False)
|
||
body = winner_path.read_text(encoding="utf-8").split("---", 2)[-1].lstrip("\n")
|
||
winner_path.write_text(f"---\n{new_yaml}---\n\n{body}", encoding="utf-8")
|
||
loser_path.unlink()
|
||
stats["merges"] += 1
|
||
stats["deletes"] += 1
|
||
return stats
|
||
|
||
|
||
def find_compound_candidates() -> list[tuple[str, str, str, int]]:
|
||
"""C — find entity pairs (A, B) appearing adjacent on ≥3 pages → likely compound name.
|
||
|
||
Walk all page.md `entities_extracted` fields; if A.name then B.name appear
|
||
near each other in the page body, count co-occurrence.
|
||
|
||
Returns: [(class_a, name_a, name_b, count)]
|
||
"""
|
||
pair_count: Counter = Counter()
|
||
page_entities: Counter = Counter()
|
||
for p in PAGES.glob("*/p*.md"):
|
||
try:
|
||
fm, _ = read_md(p)
|
||
except Exception:
|
||
continue
|
||
if not fm:
|
||
continue
|
||
ee = fm.get("entities_extracted") or {}
|
||
if not isinstance(ee, dict):
|
||
continue
|
||
# Across all entity classes, look for adjacent pairs in name lists
|
||
names = []
|
||
for cls_key in ("people", "organizations", "locations"):
|
||
for entry in (ee.get(cls_key) or []):
|
||
if isinstance(entry, dict) and entry.get("name"):
|
||
names.append((cls_key, normalize(entry["name"])))
|
||
# Pair up adjacent entries (heuristic — Haiku usually returns them in occurrence order)
|
||
for i in range(len(names) - 1):
|
||
a_cls, a = names[i]
|
||
b_cls, b = names[i + 1]
|
||
if a == b:
|
||
continue
|
||
pair_count[(a_cls, a, b)] += 1
|
||
|
||
# Filter to pairs that appear together ≥ 3 pages
|
||
out = []
|
||
for (cls_a, a, b), c in pair_count.items():
|
||
if c >= 3:
|
||
out.append((cls_a, a, b, c))
|
||
return sorted(out, key=lambda x: -x[3])
|
||
|
||
|
||
def main():
|
||
ap = argparse.ArgumentParser()
|
||
ap.add_argument("--dry-run", action="store_true", help="report only, don't delete")
|
||
ap.add_argument("--apply", action="store_true", help="apply deletes + merges")
|
||
ap.add_argument("--report-compounds", action="store_true", help="just print compound candidates and exit")
|
||
args = ap.parse_args()
|
||
|
||
if not args.dry_run and not args.apply and not args.report_compounds:
|
||
ap.error("provide --dry-run, --apply, or --report-compounds")
|
||
|
||
if args.report_compounds:
|
||
cands = find_compound_candidates()
|
||
print(f"Top compound candidates (adjacent ≥3 pages):")
|
||
for cls_a, a, b, c in cands[:50]:
|
||
print(f" {c:4d}× [{cls_a}] {a} + {b} → '{a} {b}'")
|
||
print(f"\nTotal: {len(cands)} candidates")
|
||
return
|
||
|
||
dry = args.dry_run
|
||
|
||
print(f"=== Phase 1A: filter trivial low-mention entities ({'DRY-RUN' if dry else 'APPLY'}) ===")
|
||
a = filter_low_signal(dry=dry)
|
||
print(f" deleted (trivial + ≤2 mentions): {len(a['deleted'])}")
|
||
print(f" kept (trivial but ≥3 mentions): {len(a['kept_high_mention'])}")
|
||
print(f" kept (meaningful): {len(a['kept'])}")
|
||
|
||
print(f"\n=== Phase 1B: alias-based merge ({'DRY-RUN' if dry else 'APPLY'}) ===")
|
||
b = dedupe_by_alias(dry=dry)
|
||
print(f" pairs merged: {b['merges']}")
|
||
|
||
total_remaining = sum(1 for _ in ENTITIES.glob("*/*.md"))
|
||
print(f"\nRemaining entity files: {total_remaining}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|