disclosure-bureau/scripts/03-dedup-entities.py
guto 4459bd17e4 phase-0: kill stubs, ship 20 curated anchor events, configure SMTP
- scripts/03-dedup-entities.py: stop emitting placeholder narrative ("Stub. Will
  be enriched in Phase 7"); write summary_status=none + null fields instead.
- scripts/maintain/41_strip_stubs.py: idempotent migration that cleaned the
  22,096 entity .md files (now zero stub strings in wiki/).
- scripts/synthesize/01_anchor_events.py: curated 20 anchor UAP events
  (Roswell, Nimitz Tic-Tac, Phoenix Lights, Operação Prato, AATIP, etc.) with
  bilingual Holmes-Watson narrative via claude -p --model sonnet
  (CLAUDE_CODE_OAUTH_TOKEN). All summary_status=curated, confidence=high.
- web/api/timeline + timeline-view: filter narrative-less events by default,
  render "curado" badge for hand-vetted ones, drop the date display alone.
- CLAUDE-schema-full.md: document the summary_status enum and the four states.
- docker-compose.yml: SMTP_HOST=mail.spacemail.com configured;
  GOTRUE_MAILER_AUTOCONFIRM flipped to false (real email confirmation working).
- .nirvana/outputs/.../systems-atelier/: 5 deliverables of the architecture
  audit that produced this roadmap.
2026-05-18 00:44:17 -03:00

668 lines
27 KiB
Python
Executable file

#!/usr/bin/env python3
"""
03-dedup-entities.py — Phase 5 — Entity dedup + upsert
For every page.md under wiki/pages/**/*.md:
1. Read frontmatter.entities_extracted
2. Canonicalize each entity name → kebab-case ASCII-fold id
3. Aggregate occurrences across pages (same kebab-case = same entity)
4. Upsert wiki/entities/<class>/<id>.md:
- If file missing: create with stub frontmatter + bilingual body
- If file exists: merge aliases, preserve manual edits to body, refresh
derived stats (mention_count per page, total_mentions, documents_count)
Does NOT populate mentioned_in[] — that's lint's job (script 04). This script
just creates/updates entity stubs so wiki-links resolve.
Idempotent: re-running with no new pages produces no changes (atomic write
suppresses writes when output is identical).
Uso:
./03-dedup-entities.py # process every page in wiki/pages/
./03-dedup-entities.py --doc-id <id> # only one document
./03-dedup-entities.py --dry-run # report what would change, don't write
"""
from __future__ import annotations
import argparse
import re
import sys
import unicodedata
from collections import defaultdict
from datetime import datetime, timezone
from pathlib import Path
try:
import yaml
except ImportError:
sys.stderr.write("Missing pyyaml. Run: pip3 install pyyaml\n")
sys.exit(1)
UFO_ROOT = Path("/Users/guto/ufo")
PAGES_BASE = UFO_ROOT / "wiki" / "pages"
ENTITIES_BASE = UFO_ROOT / "wiki" / "entities"
LOG_PATH = UFO_ROOT / "wiki" / "log.md"
WIKI_VERSION = "0.1.0"
SCHEMA_VERSION = "0.1.0"
# (class_name_in_page_extraction, dir_name_under_wiki/entities/, frontmatter type, entity_class field, id_field)
ENTITY_CLASSES = [
("people", "people", "entity", "person", "person_id"),
("organizations", "organizations", "entity", "organization", "organization_id"),
("locations", "locations", "entity", "location", "location_id"),
("vehicles", "vehicles", "entity", "vehicle", "vehicle_id"),
("operations", "operations", "entity", "operation", "operation_id"),
("concepts", "concepts", "entity", "concept", "concept_id"),
# events and uap_objects have non-trivial ID schemes — handled separately
]
def utc_now_iso():
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def canonicalize_name(name: str) -> str:
"""Generic name → kebab-case ASCII-fold id."""
if not name:
return ""
nfkd = unicodedata.normalize("NFKD", name)
ascii_str = "".join(c for c in nfkd if not unicodedata.combining(c))
lower = ascii_str.lower()
replaced = re.sub(r"[^a-z0-9-]", "-", lower)
collapsed = re.sub(r"-+", "-", replaced).strip("-")
if collapsed and collapsed[0].isdigit():
# IDs cannot start with digit (per CLAUDE.md rule)
collapsed = "x-" + collapsed
return collapsed
def event_id_from_entry(entry: dict) -> str:
"""Build event_id from {label, date}. Date is YYYY-MM-DD, YYYY, or NA."""
label = entry.get("label", "")
date = entry.get("date", "NA") or "NA"
slug = canonicalize_name(label)[:40].strip("-") or "unlabeled"
# Parse date
m = re.match(r"^(\d{4})-(\d{2})-(\d{2})$", date)
if m:
return f"EV-{m.group(1)}-{m.group(2)}-{m.group(3)}-{slug}"
m = re.match(r"^(\d{4})-(\d{2})$", date)
if m:
return f"EV-{m.group(1)}-{m.group(2)}-XX-{slug}"
m = re.match(r"^(\d{4})$", date)
if m:
return f"EV-{m.group(1)}-XX-XX-{slug}"
return f"EV-XXXX-XX-XX-{slug}"
def uap_object_id_from_entry(entry: dict, event_id: str, index: int) -> str:
"""OBJ-<EVENT_SLUG_UPPERCASE>-<NN>."""
# Strip "EV-" prefix and dashes from date part to make compact slug
if event_id.startswith("EV-"):
rest = event_id[3:] # "2004-11-14-tic-tac-nimitz"
# Take first 2 parts (year + slug) as compact event ref
parts = rest.split("-", 4)
if len(parts) >= 4:
# parts: [year, month, day, ...slug...]
year = parts[0]
slug_part = "-".join(parts[3:]) if len(parts) > 3 else "unk"
slug_compact = slug_part.replace("-", "").upper()[:20] or "UNK"
event_short = f"EV{year}-{slug_compact}"
else:
event_short = "UNK"
else:
event_short = "UNK"
return f"OBJ-{event_short}-{index:02d}"
def read_frontmatter_and_body(path: Path) -> tuple[dict, str]:
"""Parse a markdown file. Returns (frontmatter_dict, body_str)."""
content = path.read_text(encoding="utf-8")
if not content.startswith("---"):
return {}, content
end = content.find("---", 4)
if end == -1:
return {}, content
fm_str = content[3:end].strip()
body = content[end + 3 :].lstrip("\n")
try:
fm = yaml.safe_load(fm_str) or {}
except yaml.YAMLError as e:
sys.stderr.write(f"YAML error in {path}: {e}\n")
fm = {}
return fm, body
def write_frontmatter_and_body(path: Path, frontmatter: dict, body: str, dry_run: bool = False) -> bool:
"""Atomic write. Returns True if file was changed.
For idempotency: if the file exists and the only differences are
`last_ingest` / `last_lint` timestamps, do NOT rewrite.
"""
new_yaml = yaml.dump(frontmatter, allow_unicode=True, sort_keys=False, default_flow_style=False)
new_content = f"---\n{new_yaml}---\n\n{body}" if not body.startswith("\n") else f"---\n{new_yaml}---\n{body}"
if path.exists():
existing = path.read_text(encoding="utf-8")
if existing == new_content:
return False
# Compare frontmatter excluding volatile timestamps
existing_fm, existing_body = read_frontmatter_and_body(path)
VOLATILE = {"last_ingest", "last_lint"}
snap_old = {k: v for k, v in existing_fm.items() if k not in VOLATILE}
snap_new = {k: v for k, v in frontmatter.items() if k not in VOLATILE}
if snap_old == snap_new and existing_body == body:
return False # only timestamps differ; treat as unchanged
if dry_run:
return True
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(new_content, encoding="utf-8")
return True
def collect_entities_from_pages(doc_filter: str | None = None) -> dict:
"""
Walk wiki/pages/**/*.md and collect all entity references.
Returns: {
'people': { canonical_id: { 'aliases': set, 'mentions': [(page_id, role, doc_id), ...], 'roles': set } },
'organizations': { ... },
...
'events': { event_id: { 'labels': set, 'date': '...', 'mentions': [...] } },
'uap_objects': { obj_id: { 'shape': ..., 'color': ..., 'mentions': [...], 'event_id': ... } },
}
"""
collected = {
"people": defaultdict(lambda: {"aliases": set(), "mentions": [], "roles": set()}),
"organizations": defaultdict(lambda: {"aliases": set(), "mentions": []}),
"locations": defaultdict(lambda: {"aliases": set(), "mentions": [], "type": None}),
"vehicles": defaultdict(lambda: {"aliases": set(), "mentions": [], "class": None}),
"operations": defaultdict(lambda: {"aliases": set(), "mentions": [], "type": None}),
"concepts": defaultdict(lambda: {"aliases": set(), "mentions": [], "class": None}),
"events": defaultdict(lambda: {"labels": set(), "date": "NA", "mentions": []}),
"uap_objects": defaultdict(lambda: {"shape": None, "color": None, "size_estimate": None, "mentions": [], "event_id": None}),
}
pattern = "**/*.md"
pages = sorted(PAGES_BASE.glob(pattern))
for page_path in pages:
if doc_filter and doc_filter not in str(page_path):
continue
fm, _body = read_frontmatter_and_body(page_path)
if not fm or fm.get("type") != "page":
continue
page_id = fm.get("page_id", "")
doc_id = fm.get("doc_id", "")
if not page_id or not doc_id:
continue
entities = fm.get("entities_extracted") or {}
# Standard entity classes
for class_name, _, _, _, _ in ENTITY_CLASSES:
entries = entities.get(class_name) or []
for entry in entries:
name = entry.get("name") if isinstance(entry, dict) else None
if not name:
continue
canonical = canonicalize_name(name)
if not canonical:
continue
bucket = collected[class_name][canonical]
bucket["aliases"].add(name)
role = (entry.get("role_in_page") if class_name == "people" else None) or "mentioned"
bucket["mentions"].append((page_id, role, doc_id))
if class_name == "people":
bucket["roles"].add(role)
elif class_name == "locations":
if not bucket.get("type"):
bucket["type"] = entry.get("type")
elif class_name == "vehicles":
if not bucket.get("class"):
bucket["class"] = entry.get("class")
elif class_name == "operations":
if not bucket.get("type"):
bucket["type"] = entry.get("type")
elif class_name == "concepts":
if not bucket.get("class"):
bucket["class"] = entry.get("class")
# Events
events = entities.get("events") or []
page_event_ids: list[str] = []
for entry in events:
label = entry.get("label")
if not label:
continue
ev_id = event_id_from_entry(entry)
page_event_ids.append(ev_id)
bucket = collected["events"][ev_id]
bucket["labels"].add(label)
bucket["mentions"].append((page_id, "documented_in", doc_id))
date = entry.get("date") or "NA"
if date != "NA" and bucket["date"] == "NA":
bucket["date"] = date
# UAP objects — link to first event on the page if available
uaps = entities.get("uap_objects") or []
for idx, entry in enumerate(uaps, start=1):
event_for_obj = page_event_ids[0] if page_event_ids else f"EV-XXXX-XX-XX-{canonicalize_name(doc_id)[:30]}"
obj_id = uap_object_id_from_entry(entry, event_for_obj, idx)
bucket = collected["uap_objects"][obj_id]
bucket["shape"] = bucket["shape"] or entry.get("shape")
bucket["color"] = bucket["color"] or entry.get("color")
bucket["size_estimate"] = bucket["size_estimate"] or entry.get("size_estimate")
bucket["event_id"] = bucket["event_id"] or event_for_obj
bucket["mentions"].append((page_id, "observation", doc_id))
return collected
def _empty_body(entity_class: str, canonical_name: str) -> str:
"""Header-only body for new entities; narrative is filled by the synthesis
pipeline (scripts/synthesize/) when total_mentions ≥ 5, or by manual
curation. We never emit placeholder text — `summary_status: none` in the
frontmatter signals 'not yet synthesised' to the renderer."""
return (
f"# {canonical_name}\n\n"
"## Description (EN)\n\n"
"## Descrição (PT-BR)\n"
)
# Pre-built alias index: {dir_name: {alias_lower: path}} cached on first access.
_ALIAS_INDEX: dict[str, dict[str, Path]] = {}
def _ensure_alias_index(dir_name: str) -> dict[str, Path]:
"""Build alias→path map for a class folder once, cached. O(N) initial scan."""
if dir_name in _ALIAS_INDEX:
return _ALIAS_INDEX[dir_name]
target_dir = ENTITIES_BASE / dir_name
index: dict[str, Path] = {}
if target_dir.exists():
for entity_path in target_dir.glob("*.md"):
try:
fm, _ = read_frontmatter_and_body(entity_path)
except Exception:
continue
# Index by stem (canonical_id) AND by all aliases
index[entity_path.stem.lower()] = entity_path
cname = fm.get("canonical_name")
if isinstance(cname, str) and cname.strip():
index[cname.lower().strip()] = entity_path
for alias in (fm.get("aliases") or []):
if isinstance(alias, str) and alias.strip():
index[alias.lower().strip()] = entity_path
_ALIAS_INDEX[dir_name] = index
return index
def _find_existing_entity_by_alias(
dir_name: str,
names: set[str],
canonical_id_candidate: str,
) -> Path | None:
"""O(1) lookup via pre-built alias index."""
idx = _ensure_alias_index(dir_name)
canon_needle = canonical_id_candidate.lower()
if canon_needle in idx:
return idx[canon_needle]
for n in names:
if not n:
continue
key = n.lower().strip()
if key in idx:
return idx[key]
return None
def _register_in_index(dir_name: str, path: Path, names: set[str], canonical_name: str | None = None) -> None:
"""Add a newly-created or updated entity to the in-memory alias index."""
idx = _ensure_alias_index(dir_name)
idx[path.stem.lower()] = path
if canonical_name:
idx[canonical_name.lower().strip()] = path
for n in names:
if isinstance(n, str) and n.strip():
idx[n.lower().strip()] = path
def _upsert_simple_entity(
class_name: str,
dir_name: str,
type_value: str,
entity_class: str,
id_field: str,
canonical_id: str,
data: dict,
dry_run: bool,
) -> tuple[str, bool, Path]:
"""Upsert a person/org/location/vehicle/operation/concept entity file.
Returns (action, changed_bool, real_path).
Action is 'created'|'updated'|'unchanged'|'merged-into-existing'.
"""
# Check if an existing entity matches by alias — avoid creating duplicates
existing = _find_existing_entity_by_alias(dir_name, data.get("aliases", set()), canonical_id)
merged = False
if existing and existing.stem != canonical_id:
path = existing
merged = True
else:
path = ENTITIES_BASE / dir_name / f"{canonical_id}.md"
aliases_sorted = sorted(data.get("aliases", set()))
# canonical_name = most common alias (first by sort) — could be improved
canonical_name = aliases_sorted[0] if aliases_sorted else canonical_id
unique_docs = {doc_id for _, _, doc_id in data["mentions"]}
total_mentions = len(data["mentions"])
documents_count = len(unique_docs)
if path.exists():
fm, body = read_frontmatter_and_body(path)
# Merge aliases (preserve existing + add new)
existing_aliases = set(fm.get("aliases", []) or [])
merged_aliases = sorted(existing_aliases | set(aliases_sorted))
fm["aliases"] = merged_aliases
fm["total_mentions"] = total_mentions
fm["documents_count"] = documents_count
fm["last_ingest"] = utc_now_iso()
# Refresh entity-specific fields if missing
if class_name == "locations" and not fm.get("location_type") and data.get("type"):
fm["location_type"] = data["type"]
if class_name == "vehicles" and not fm.get("vehicle_class") and data.get("class"):
fm["vehicle_class"] = data["class"]
if class_name == "operations" and not fm.get("operation_type") and data.get("type"):
fm["operation_type"] = data["type"]
if class_name == "concepts" and not fm.get("concept_class") and data.get("class"):
fm["concept_class"] = data["class"]
changed = write_frontmatter_and_body(path, fm, body, dry_run=dry_run)
action = "merged-into-existing" if merged else ("updated" if changed else "unchanged")
return (action, changed, path)
# Create new
fm = {
"schema_version": SCHEMA_VERSION,
"type": type_value,
"entity_class": entity_class,
id_field: canonical_id,
"canonical_name": canonical_name,
"aliases": aliases_sorted,
}
if class_name == "people":
fm["roles"] = []
fm["dates"] = {"born": None, "died": None}
elif class_name == "organizations":
fm["organization_type"] = None
fm["country"] = None
elif class_name == "locations":
fm["location_type"] = data.get("type")
fm["country"] = []
fm["coordinates"] = None
elif class_name == "vehicles":
fm["vehicle_class"] = data.get("class")
elif class_name == "operations":
fm["operation_type"] = data.get("type")
fm["status"] = None
elif class_name == "concepts":
fm["concept_class"] = data.get("class")
fm["domain"] = None
fm["definition_short"] = None
fm["definition_short_pt_br"] = None
fm["mentioned_in"] = [] # populated by lint
fm["total_mentions"] = total_mentions
fm["documents_count"] = documents_count
fm["related_concepts" if class_name == "concepts" else "related"] = []
fm["enrichment_status"] = "none"
fm["external_sources"] = []
fm["last_ingest"] = utc_now_iso()
fm["last_lint"] = None
fm["wiki_version"] = WIKI_VERSION
body = _empty_body(entity_class, canonical_name)
write_frontmatter_and_body(path, fm, body, dry_run=dry_run)
_register_in_index(dir_name, path, set(aliases_sorted), canonical_name)
return ("created", True, path)
def _upsert_event(event_id: str, data: dict, dry_run: bool) -> tuple[str, bool, Path]:
labels = sorted(data["labels"])
canonical_name = labels[0] if labels else event_id
unique_docs = {doc_id for _, _, doc_id in data["mentions"]}
total_mentions = len(data["mentions"])
# Alias-match against existing events
existing = _find_existing_entity_by_alias("events", set(labels), event_id)
merged = False
if existing and existing.stem != event_id:
path = existing
merged = True
else:
path = ENTITIES_BASE / "events" / f"{event_id}.md"
# Date parse from event_id
m = re.match(r"^EV-(\d{4}|XXXX)-(\d{2}|XX)-(\d{2}|XX)-", event_id)
date_start = "NA"
if m:
y, mo, d = m.groups()
if y != "XXXX":
if mo != "XX" and d != "XX":
date_start = f"{y}-{mo}-{d}"
elif mo != "XX":
date_start = f"{y}-{mo}"
else:
date_start = y
if path.exists():
fm, body = read_frontmatter_and_body(path)
existing_aliases = set(fm.get("aliases", []) or [])
fm["aliases"] = sorted(existing_aliases | set(labels))
fm["total_mentions"] = total_mentions
fm["documents_count"] = len(unique_docs)
fm["last_ingest"] = utc_now_iso()
changed = write_frontmatter_and_body(path, fm, body, dry_run=dry_run)
action = "merged-into-existing" if merged else ("updated" if changed else "unchanged")
return (action, changed, path)
fm = {
"schema_version": SCHEMA_VERSION,
"type": "entity",
"entity_class": "event",
"event_id": event_id,
"canonical_name": canonical_name,
"aliases": labels,
"event_class": "uap-encounter",
"date_start": date_start,
"date_end": date_start,
"date_confidence": "low",
"primary_location": None,
"observers": [],
"uap_objects": [],
"documented_in": [],
"total_mentions": total_mentions,
"documents_count": len(unique_docs),
"narrative_summary": None,
"narrative_summary_pt_br": None,
"summary_status": "none",
"summary_confidence": None,
"enrichment_status": "none",
"external_sources": [],
"last_ingest": utc_now_iso(),
"last_lint": None,
"wiki_version": WIKI_VERSION,
}
body = _empty_body("events", canonical_name)
write_frontmatter_and_body(path, fm, body, dry_run=dry_run)
_register_in_index("events", path, set(labels), canonical_name)
return ("created", True, path)
def _find_existing_uap_object_by_event(event_id: str | None, shape: str, color: str, current_id: str) -> Path | None:
"""If an existing uap_object is observed in the same event with matching shape (or unknown),
treat as the same object."""
if not event_id:
return None
target_dir = ENTITIES_BASE / "uap-objects"
if not target_dir.exists():
return None
event_ref = f"[[event/{event_id}]]"
for p in target_dir.glob("*.md"):
if p.stem == current_id:
return p
try:
fm, _ = read_frontmatter_and_body(p)
except Exception:
continue
if fm.get("observed_in_event") != event_ref:
continue
existing_shape = (fm.get("shape") or "unknown").lower()
existing_color = (fm.get("color") or "unknown").lower()
if existing_shape in ("unknown", "", shape.lower()) and (
existing_color in ("unknown", "", color.lower())
):
return p
return None
def _upsert_uap_object(obj_id: str, data: dict, dry_run: bool) -> tuple[str, bool, Path]:
shape = data.get("shape") or "unknown"
color = data.get("color") or "unknown"
canonical_name = f"{shape} {color} UAP ({obj_id})"
event_id = data.get("event_id")
unique_docs = {doc_id for _, _, doc_id in data["mentions"]}
total_mentions = len(data["mentions"])
# If an existing uap_object is anchored to the same event with compatible shape/color, merge
existing = _find_existing_uap_object_by_event(event_id, shape, color, obj_id)
merged = False
if existing and existing.stem != obj_id:
path = existing
merged = True
else:
path = ENTITIES_BASE / "uap-objects" / f"{obj_id}.md"
if path.exists():
fm, body = read_frontmatter_and_body(path)
fm["total_mentions"] = total_mentions
fm["documents_count"] = len(unique_docs)
fm["last_ingest"] = utc_now_iso()
changed = write_frontmatter_and_body(path, fm, body, dry_run=dry_run)
action = "merged-into-existing" if merged else ("updated" if changed else "unchanged")
return (action, changed, path)
fm = {
"schema_version": SCHEMA_VERSION,
"type": "entity",
"entity_class": "uap_object",
"uap_object_id": obj_id,
"canonical_name": canonical_name,
"observed_in_event": f"[[event/{event_id}]]" if event_id else None,
"secondary_events": [],
"shape": shape,
"color": color,
"size_estimate_m": {"min": None, "max": None, "confidence_band": "speculation"},
"features": [],
"altitude_ft": {"min": None, "max": None, "confidence_band": "speculation"},
"speed_kts": {"min": None, "max": None, "confidence_band": "speculation"},
"maneuver_descriptors": [],
"sensor_observations": [],
"visual_records": [],
"total_mentions": total_mentions,
"documents_count": len(unique_docs),
"evidence_anchored": [],
"hypotheses_addressing": [],
"confidence_band_overall": "low",
"last_ingest": utc_now_iso(),
"last_lint": None,
"wiki_version": WIKI_VERSION,
}
body = _empty_body("uap_objects", canonical_name)
write_frontmatter_and_body(path, fm, body, dry_run=dry_run)
_register_in_index("uap-objects", path, set(), canonical_name)
return ("created", True, path)
def main():
ap = argparse.ArgumentParser(description="Dedup and upsert entities from page extractions.")
ap.add_argument("--doc-id", help="Only process pages of this doc_id")
ap.add_argument("--dry-run", action="store_true", help="Report would-be changes without writing")
args = ap.parse_args()
print(f"Scanning {PAGES_BASE} for entity references...", flush=True)
collected = collect_entities_from_pages(doc_filter=args.doc_id)
totals = {k: len(v) for k, v in collected.items()}
print(f"Found unique entities: {totals}", flush=True)
stats = {"created": 0, "updated": 0, "unchanged": 0, "merged-into-existing": 0}
# Simple classes
for class_name, dir_name, type_value, entity_class, id_field in ENTITY_CLASSES:
for canonical_id, data in collected[class_name].items():
action, changed, real_path = _upsert_simple_entity(
class_name, dir_name, type_value, entity_class, id_field,
canonical_id, data, dry_run=args.dry_run,
)
# Bucket merged-but-unchanged into "unchanged"
if action == "merged-into-existing" and not changed:
stats["unchanged"] += 1
else:
stats[action] += 1
if changed:
rel = real_path.relative_to(UFO_ROOT)
tag = f"merged ({canonical_id}{real_path.stem})" if action == "merged-into-existing" else action
print(f" [{tag}] {rel}", flush=True)
# Events
for event_id, data in collected["events"].items():
action, changed, real_path = _upsert_event(event_id, data, dry_run=args.dry_run)
if action == "merged-into-existing" and not changed:
stats["unchanged"] += 1
else:
stats[action] += 1
if changed:
tag = f"merged ({event_id}{real_path.stem})" if action == "merged-into-existing" else action
print(f" [{tag}] {real_path.relative_to(UFO_ROOT)}", flush=True)
# UAP objects — need to resolve event_id reference first via event upsert
# The event_id stored in data may have been merged into a different existing event.
# Pass through the event merge map to remap.
event_merge_map = {}
for event_id, edata in collected["events"].items():
# Re-derive what _upsert_event would have decided
labels = sorted(edata["labels"])
existing = _find_existing_entity_by_alias("events", set(labels), event_id)
if existing and existing.stem != event_id:
event_merge_map[event_id] = existing.stem
for obj_id, data in collected["uap_objects"].items():
# Remap event_id if it was merged
if data.get("event_id") in event_merge_map:
data["event_id"] = event_merge_map[data["event_id"]]
action, changed, real_path = _upsert_uap_object(obj_id, data, dry_run=args.dry_run)
if action == "merged-into-existing" and not changed:
stats["unchanged"] += 1
else:
stats[action] += 1
if changed:
tag = f"merged ({obj_id}{real_path.stem})" if action == "merged-into-existing" else action
print(f" [{tag}] {real_path.relative_to(UFO_ROOT)}", flush=True)
print(f"\nSummary: created={stats['created']}, updated={stats['updated']}, "
f"merged={stats['merged-into-existing']}, unchanged={stats['unchanged']}", flush=True)
if not args.dry_run and (stats["created"] or stats["updated"]):
with open(LOG_PATH, "a", encoding="utf-8") as fh:
fh.write(f"\n## {utc_now_iso()} — ENTITY DEDUP (Phase 5)\n")
fh.write(f"- operator: archivist\n")
fh.write(f"- script: scripts/03-dedup-entities.py\n")
fh.write(f"- doc_filter: {args.doc_id or '(all)'}\n")
fh.write(f"- created: {stats['created']}\n- updated: {stats['updated']}\n- unchanged: {stats['unchanged']}\n")
fh.write(f"- totals_after: {totals}\n")
if __name__ == "__main__":
main()