456 lines
18 KiB
Python
456 lines
18 KiB
Python
|
|
#!/usr/bin/env python3
|
||
|
|
"""
|
||
|
|
42_sync_entity_stats.py — Bulletproof sync of every entity's reverse-reference
|
||
|
|
signals.
|
||
|
|
|
||
|
|
Three independent signal sources exist for an entity. Until now the UI used
|
||
|
|
only one of them and showed "0 menções" whenever the others disagreed. This
|
||
|
|
script rebuilds them all in a single pass:
|
||
|
|
|
||
|
|
1. wiki_page_refs — pages whose entities_extracted[] lists this entity.
|
||
|
|
Materialised back into the entity's mentioned_in[].
|
||
|
|
|
||
|
|
2. db_chunk_mentions — count of rows in public.entity_mentions whose
|
||
|
|
chunk_pk matches a chunk that textually contains the
|
||
|
|
entity (ILIKE on canonical_name + aliases). Source of
|
||
|
|
truth for chat / search retrieval.
|
||
|
|
|
||
|
|
3. cross_entity_refs — reverse-links discovered by traversing other entity
|
||
|
|
YAMLs: an event's uap_objects[] / observers[] /
|
||
|
|
organizations_involved[]; a location's events_here[];
|
||
|
|
a document's key_entities[].
|
||
|
|
|
||
|
|
After scanning, each entity's frontmatter is rewritten with:
|
||
|
|
|
||
|
|
mentioned_in: [...] # the page refs (canonical, not generated noise)
|
||
|
|
total_mentions: <int> # max(db_chunk_mentions, len(mentioned_in))
|
||
|
|
documents_count: <int> # distinct docs across both signals
|
||
|
|
signal_sources:
|
||
|
|
db_chunks: <int>
|
||
|
|
page_refs: <int>
|
||
|
|
cross_refs: <int>
|
||
|
|
signal_strength: strong | weak | orphan
|
||
|
|
last_lint: <utc>
|
||
|
|
|
||
|
|
When all three signals are zero the entity is moved to
|
||
|
|
wiki/entities/_archived/<class>/<id>.md and a one-line record is appended to
|
||
|
|
wiki/log.md.
|
||
|
|
|
||
|
|
Idempotent: re-running converges. Safe to interrupt — writes are atomic.
|
||
|
|
"""
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import argparse
|
||
|
|
import json
|
||
|
|
import os
|
||
|
|
import re
|
||
|
|
import shutil
|
||
|
|
import sys
|
||
|
|
import unicodedata
|
||
|
|
from collections import defaultdict
|
||
|
|
from datetime import datetime, timezone
|
||
|
|
from pathlib import Path
|
||
|
|
|
||
|
|
try:
|
||
|
|
import yaml
|
||
|
|
import psycopg
|
||
|
|
except ImportError as e:
|
||
|
|
sys.stderr.write(f"pip3 install pyyaml psycopg[binary] # missing: {e}\n")
|
||
|
|
sys.exit(1)
|
||
|
|
|
||
|
|
|
||
|
|
UFO_ROOT = Path(__file__).resolve().parents[2]
|
||
|
|
ENTITIES_BASE = UFO_ROOT / "wiki" / "entities"
|
||
|
|
ARCHIVED_BASE = UFO_ROOT / "wiki" / "entities" / "_archived"
|
||
|
|
PAGES_BASE = UFO_ROOT / "wiki" / "pages"
|
||
|
|
DOCS_BASE = UFO_ROOT / "wiki" / "documents"
|
||
|
|
LOG_PATH = UFO_ROOT / "wiki" / "log.md"
|
||
|
|
|
||
|
|
DATABASE_URL = os.getenv("DATABASE_URL") or os.getenv("SUPABASE_DB_URL")
|
||
|
|
|
||
|
|
# Map plural folder names to the entity_class singular used in DB
|
||
|
|
FOLDER_TO_CLASS = {
|
||
|
|
"people": "person",
|
||
|
|
"organizations": "organization",
|
||
|
|
"locations": "location",
|
||
|
|
"events": "event",
|
||
|
|
"uap-objects": "uap_object",
|
||
|
|
"vehicles": "vehicle",
|
||
|
|
"operations": "operation",
|
||
|
|
"concepts": "concept",
|
||
|
|
}
|
||
|
|
CLASS_TO_FOLDER = {v: k for k, v in FOLDER_TO_CLASS.items()}
|
||
|
|
|
||
|
|
ID_FIELD_BY_CLASS = {
|
||
|
|
"person": "person_id",
|
||
|
|
"organization": "organization_id",
|
||
|
|
"location": "location_id",
|
||
|
|
"event": "event_id",
|
||
|
|
"uap_object": "uap_object_id",
|
||
|
|
"vehicle": "vehicle_id",
|
||
|
|
"operation": "operation_id",
|
||
|
|
"concept": "concept_id",
|
||
|
|
}
|
||
|
|
|
||
|
|
# Cross-entity fields that contain wikilinks pointing TO another entity.
|
||
|
|
CROSS_REF_FIELDS = {
|
||
|
|
"event": ["uap_objects", "observers", "organizations_involved",
|
||
|
|
"vehicles_involved", "witnesses_analyses", "preceded_by",
|
||
|
|
"followed_by", "related_events", "documented_in",
|
||
|
|
"primary_location"],
|
||
|
|
"location": ["events_here"],
|
||
|
|
"uap_object": ["observed_in_event", "secondary_events"],
|
||
|
|
"operation": ["documents"],
|
||
|
|
"document": ["key_entities", "key_events"],
|
||
|
|
}
|
||
|
|
|
||
|
|
WIKILINK_RE = re.compile(r"\[\[([^\]|]+?)(?:\|[^\]]+)?\]\]")
|
||
|
|
|
||
|
|
|
||
|
|
def canonicalize_name(name: str) -> str:
|
||
|
|
"""name → kebab-case ASCII-fold id (same algorithm as 03-dedup-entities.py)."""
|
||
|
|
if not name:
|
||
|
|
return ""
|
||
|
|
nfkd = unicodedata.normalize("NFKD", str(name))
|
||
|
|
ascii_str = "".join(c for c in nfkd if not unicodedata.combining(c))
|
||
|
|
lower = ascii_str.lower()
|
||
|
|
replaced = re.sub(r"[^a-z0-9-]", "-", lower)
|
||
|
|
collapsed = re.sub(r"-+", "-", replaced).strip("-")
|
||
|
|
if collapsed and collapsed[0].isdigit():
|
||
|
|
collapsed = "x-" + collapsed
|
||
|
|
return collapsed
|
||
|
|
|
||
|
|
|
||
|
|
def utc_iso() -> str:
|
||
|
|
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
|
||
|
|
|
||
|
|
|
||
|
|
def read_md(path: Path) -> tuple[dict, str]:
|
||
|
|
raw = path.read_text(encoding="utf-8")
|
||
|
|
if not raw.startswith("---"):
|
||
|
|
return {}, raw
|
||
|
|
end = raw.find("---", 4)
|
||
|
|
try:
|
||
|
|
fm = yaml.safe_load(raw[3:end].strip()) or {}
|
||
|
|
except yaml.YAMLError:
|
||
|
|
return {}, raw
|
||
|
|
body = raw[end + 3 :].lstrip("\n")
|
||
|
|
return fm, body
|
||
|
|
|
||
|
|
|
||
|
|
def write_md(path: Path, fm: dict, body: str) -> None:
|
||
|
|
"""Atomic write so we never leave a half-written YAML."""
|
||
|
|
yaml_str = yaml.dump(fm, allow_unicode=True, sort_keys=False, default_flow_style=False)
|
||
|
|
sep = "" if body.startswith("\n") else "\n"
|
||
|
|
tmp = path.with_suffix(path.suffix + ".tmp")
|
||
|
|
tmp.write_text(f"---\n{yaml_str}---\n{sep}{body}", encoding="utf-8")
|
||
|
|
tmp.replace(path)
|
||
|
|
|
||
|
|
|
||
|
|
def parse_wikilink_target(s: str) -> tuple[str | None, str | None]:
|
||
|
|
"""[[class/id]] or [[event/id]] → (class, id). Returns (None, None) if not parseable."""
|
||
|
|
if not s or not isinstance(s, str):
|
||
|
|
return None, None
|
||
|
|
m = WIKILINK_RE.search(s)
|
||
|
|
target = m.group(1).strip() if m else s.strip()
|
||
|
|
if "/" not in target:
|
||
|
|
return None, None
|
||
|
|
parts = target.split("/", 1)
|
||
|
|
prefix, ident = parts[0], parts[1]
|
||
|
|
# accept singular ("event/...") or plural ("events/...") or class-name
|
||
|
|
aliases = {
|
||
|
|
"people": "person", "person": "person",
|
||
|
|
"org": "organization", "organization": "organization", "organizations": "organization",
|
||
|
|
"loc": "location", "location": "location", "locations": "location",
|
||
|
|
"event": "event", "events": "event",
|
||
|
|
"uap": "uap_object", "uap_object": "uap_object", "uap-objects": "uap_object",
|
||
|
|
"vehicle": "vehicle", "vehicles": "vehicle",
|
||
|
|
"op": "operation", "operation": "operation", "operations": "operation",
|
||
|
|
"concept": "concept", "concepts": "concept",
|
||
|
|
}
|
||
|
|
cls = aliases.get(prefix.lower())
|
||
|
|
return (cls, ident.strip()) if cls else (None, None)
|
||
|
|
|
||
|
|
|
||
|
|
def collect_page_refs() -> dict[tuple[str, str], set[str]]:
|
||
|
|
"""
|
||
|
|
Scan wiki/pages/<doc>/p*.md. For each page, parse
|
||
|
|
`entities_extracted: {people: [...], organizations: [...], ...}` and append
|
||
|
|
the page_id to that entity's set.
|
||
|
|
|
||
|
|
Returns {(class, id): {page_id, ...}}.
|
||
|
|
"""
|
||
|
|
refs: dict[tuple[str, str], set[str]] = defaultdict(set)
|
||
|
|
for page_path in PAGES_BASE.rglob("p*.md"):
|
||
|
|
try:
|
||
|
|
fm, _ = read_md(page_path)
|
||
|
|
except Exception:
|
||
|
|
continue
|
||
|
|
extracted = fm.get("entities_extracted") or {}
|
||
|
|
if not isinstance(extracted, dict):
|
||
|
|
continue
|
||
|
|
# page_id like "doc-abc/p007"
|
||
|
|
doc_id = page_path.parent.name
|
||
|
|
page_id = f"{doc_id}/{page_path.stem}"
|
||
|
|
for folder, entries in extracted.items():
|
||
|
|
cls = FOLDER_TO_CLASS.get(folder)
|
||
|
|
if not cls or not isinstance(entries, list):
|
||
|
|
continue
|
||
|
|
for entry in entries:
|
||
|
|
# entry can be a plain string id, a wikilink, or a dict with
|
||
|
|
# a `name` field that we must canonicalize ourselves (matches
|
||
|
|
# the algorithm used in scripts/03-dedup-entities.py).
|
||
|
|
eid = None
|
||
|
|
if isinstance(entry, str):
|
||
|
|
_, parsed_eid = parse_wikilink_target(entry)
|
||
|
|
eid = parsed_eid or canonicalize_name(entry)
|
||
|
|
elif isinstance(entry, dict):
|
||
|
|
eid = (entry.get("id")
|
||
|
|
or entry.get(ID_FIELD_BY_CLASS.get(cls, "id"))
|
||
|
|
or canonicalize_name(entry.get("name", "")))
|
||
|
|
if eid:
|
||
|
|
refs[(cls, eid)].add(page_id)
|
||
|
|
# Also index by every alias, so e.g. "USCENTCOM" matches a
|
||
|
|
# United States Central Command entity if dedup ran on aliases.
|
||
|
|
if isinstance(entry, dict):
|
||
|
|
for alias in (entry.get("aliases") or []):
|
||
|
|
alias_id = canonicalize_name(alias)
|
||
|
|
if alias_id and alias_id != eid:
|
||
|
|
refs[(cls, alias_id)].add(page_id)
|
||
|
|
return refs
|
||
|
|
|
||
|
|
|
||
|
|
def collect_cross_refs() -> dict[tuple[str, str], set[str]]:
|
||
|
|
"""
|
||
|
|
Sweep entity YAMLs themselves. When entity X declares
|
||
|
|
`uap_objects: [[[uap/OBJ-...]]]`, we register OBJ-... → X as a cross-ref.
|
||
|
|
"""
|
||
|
|
refs: dict[tuple[str, str], set[str]] = defaultdict(set)
|
||
|
|
for folder, cls in FOLDER_TO_CLASS.items():
|
||
|
|
cls_dir = ENTITIES_BASE / folder
|
||
|
|
if not cls_dir.is_dir():
|
||
|
|
continue
|
||
|
|
for ent_path in cls_dir.glob("*.md"):
|
||
|
|
try:
|
||
|
|
fm, _ = read_md(ent_path)
|
||
|
|
except Exception:
|
||
|
|
continue
|
||
|
|
id_field = ID_FIELD_BY_CLASS.get(cls)
|
||
|
|
self_id = fm.get(id_field) or ent_path.stem
|
||
|
|
for field in CROSS_REF_FIELDS.get(cls, []):
|
||
|
|
val = fm.get(field)
|
||
|
|
items = val if isinstance(val, list) else ([val] if val else [])
|
||
|
|
for item in items:
|
||
|
|
tgt_cls, tgt_id = parse_wikilink_target(item if isinstance(item, str) else str(item))
|
||
|
|
if tgt_cls and tgt_id:
|
||
|
|
refs[(tgt_cls, tgt_id)].add(f"{cls}/{self_id}")
|
||
|
|
# Also walk documents/key_entities
|
||
|
|
for doc_path in DOCS_BASE.glob("*.md"):
|
||
|
|
try:
|
||
|
|
fm, _ = read_md(doc_path)
|
||
|
|
except Exception:
|
||
|
|
continue
|
||
|
|
for item in (fm.get("key_entities") or []):
|
||
|
|
tgt_cls, tgt_id = parse_wikilink_target(item if isinstance(item, str) else str(item))
|
||
|
|
if tgt_cls and tgt_id:
|
||
|
|
refs[(tgt_cls, tgt_id)].add(f"document/{doc_path.stem}")
|
||
|
|
return refs
|
||
|
|
|
||
|
|
|
||
|
|
def collect_db_mentions(conn) -> dict[tuple[str, str], tuple[int, int]]:
|
||
|
|
"""Return {(class, id): (chunk_count, doc_count)} from public.entity_mentions."""
|
||
|
|
out: dict[tuple[str, str], tuple[int, int]] = {}
|
||
|
|
with conn.cursor() as cur:
|
||
|
|
cur.execute(
|
||
|
|
"""
|
||
|
|
SELECT e.entity_class, e.entity_id,
|
||
|
|
COUNT(em.chunk_pk)::int AS chunks,
|
||
|
|
COUNT(DISTINCT c.doc_id)::int AS docs
|
||
|
|
FROM public.entities e
|
||
|
|
LEFT JOIN public.entity_mentions em ON em.entity_pk = e.entity_pk
|
||
|
|
LEFT JOIN public.chunks c ON c.chunk_pk = em.chunk_pk
|
||
|
|
GROUP BY e.entity_class, e.entity_id
|
||
|
|
"""
|
||
|
|
)
|
||
|
|
for cls, eid, chunks, docs in cur.fetchall():
|
||
|
|
out[(cls, eid)] = (chunks or 0, docs or 0)
|
||
|
|
return out
|
||
|
|
|
||
|
|
|
||
|
|
def signal_strength(db_chunks: int, page_refs: int, cross_refs: int) -> str:
|
||
|
|
total = db_chunks + page_refs + cross_refs
|
||
|
|
if total == 0:
|
||
|
|
return "orphan"
|
||
|
|
if db_chunks >= 3 or page_refs >= 3 or (db_chunks >= 1 and page_refs >= 1):
|
||
|
|
return "strong"
|
||
|
|
return "weak"
|
||
|
|
|
||
|
|
|
||
|
|
def archive_entity(path: Path, dry_run: bool, archived_count: list[int]) -> None:
|
||
|
|
rel = path.relative_to(ENTITIES_BASE)
|
||
|
|
target = ARCHIVED_BASE / rel
|
||
|
|
archived_count[0] += 1
|
||
|
|
if dry_run:
|
||
|
|
return
|
||
|
|
target.parent.mkdir(parents=True, exist_ok=True)
|
||
|
|
shutil.move(str(path), str(target))
|
||
|
|
|
||
|
|
|
||
|
|
def main() -> int:
|
||
|
|
p = argparse.ArgumentParser()
|
||
|
|
p.add_argument("--dry-run", action="store_true")
|
||
|
|
p.add_argument("--archive", action="store_true",
|
||
|
|
help="actually move orphans to wiki/entities/_archived/. "
|
||
|
|
"By default we only mark them — data is never lost.")
|
||
|
|
p.add_argument("--archive-only-junk", action="store_true",
|
||
|
|
help="archive ONLY entities whose canonical_name is <=3 chars, "
|
||
|
|
"purely numeric, or matches obvious junk patterns")
|
||
|
|
p.add_argument("--fix-obj-names", action="store_true",
|
||
|
|
help="rewrite OBJ-* canonical_name to '<event> UAP', "
|
||
|
|
"moving the full shape description to aliases")
|
||
|
|
p.add_argument("--verbose", action="store_true")
|
||
|
|
args = p.parse_args()
|
||
|
|
|
||
|
|
print(f"scanning {ENTITIES_BASE} ...")
|
||
|
|
if not DATABASE_URL:
|
||
|
|
sys.stderr.write("DATABASE_URL not set — cannot read DB mentions\n")
|
||
|
|
return 1
|
||
|
|
|
||
|
|
print("collecting page refs from wiki/pages/ ...")
|
||
|
|
page_refs = collect_page_refs()
|
||
|
|
print(f" {len(page_refs)} entities referenced from {sum(len(v) for v in page_refs.values())} page rows")
|
||
|
|
|
||
|
|
print("collecting cross-entity refs ...")
|
||
|
|
cross_refs = collect_cross_refs()
|
||
|
|
print(f" {len(cross_refs)} entities back-linked")
|
||
|
|
|
||
|
|
print(f"reading DB entity_mentions ...")
|
||
|
|
with psycopg.connect(DATABASE_URL) as conn:
|
||
|
|
db_counts = collect_db_mentions(conn)
|
||
|
|
print(f" {len(db_counts)} entities in DB")
|
||
|
|
|
||
|
|
# Walk every entity YAML on disk
|
||
|
|
archived_count = [0]
|
||
|
|
stats = {"strong": 0, "weak": 0, "orphan": 0, "updated": 0, "skipped": 0}
|
||
|
|
|
||
|
|
for folder, cls in FOLDER_TO_CLASS.items():
|
||
|
|
cls_dir = ENTITIES_BASE / folder
|
||
|
|
if not cls_dir.is_dir():
|
||
|
|
continue
|
||
|
|
for ent_path in cls_dir.glob("*.md"):
|
||
|
|
try:
|
||
|
|
fm, body = read_md(ent_path)
|
||
|
|
except Exception:
|
||
|
|
stats["skipped"] += 1
|
||
|
|
continue
|
||
|
|
if not fm:
|
||
|
|
stats["skipped"] += 1
|
||
|
|
continue
|
||
|
|
id_field = ID_FIELD_BY_CLASS.get(cls)
|
||
|
|
eid = fm.get(id_field) or ent_path.stem
|
||
|
|
key = (cls, eid)
|
||
|
|
|
||
|
|
db_chunks, db_docs = db_counts.get(key, (0, 0))
|
||
|
|
page_list = sorted(page_refs.get(key, set()))
|
||
|
|
cross_list = sorted(cross_refs.get(key, set()))
|
||
|
|
|
||
|
|
# Also count this entity's OWN outgoing wikilinks as signal —
|
||
|
|
# if an OBJ has observed_in_event pointing to a real event, the
|
||
|
|
# OBJ is anchored even when no one links back to it.
|
||
|
|
own_outgoing: set[str] = set()
|
||
|
|
for field in CROSS_REF_FIELDS.get(cls, []):
|
||
|
|
val = fm.get(field)
|
||
|
|
items = val if isinstance(val, list) else ([val] if val else [])
|
||
|
|
for item in items:
|
||
|
|
tgt_cls, tgt_id = parse_wikilink_target(
|
||
|
|
item if isinstance(item, str) else str(item))
|
||
|
|
if tgt_cls and tgt_id:
|
||
|
|
own_outgoing.add(f"{tgt_cls}/{tgt_id}")
|
||
|
|
|
||
|
|
all_cross = sorted(set(cross_list) | own_outgoing)
|
||
|
|
strength = signal_strength(db_chunks, len(page_list), len(all_cross))
|
||
|
|
|
||
|
|
stats[strength] += 1
|
||
|
|
|
||
|
|
# Optional: clean up OBJ entities whose canonical_name is a 100-char
|
||
|
|
# shape description plus the ID in parentheses. Move the description
|
||
|
|
# to an alias and pick a short readable name from the linked event.
|
||
|
|
if args.fix_obj_names and cls == "uap_object":
|
||
|
|
cn = str(fm.get("canonical_name") or "")
|
||
|
|
if len(cn) > 80 and "UAP" in cn and "(" in cn and cn.endswith(")"):
|
||
|
|
obs_event = fm.get("observed_in_event")
|
||
|
|
event_cls, event_id = parse_wikilink_target(obs_event or "")
|
||
|
|
if event_cls == "event" and event_id:
|
||
|
|
# Strip the "EV-YYYY-MM-DD-" prefix to get a slug
|
||
|
|
slug = re.sub(r"^EV-\d{4}-[\dX]{2}-[\dX]{2}-", "", event_id)
|
||
|
|
new_name = slug.replace("-", " ").strip() or eid
|
||
|
|
new_name = new_name[:1].upper() + new_name[1:] + " UAP"
|
||
|
|
aliases = list(fm.get("aliases") or [])
|
||
|
|
if cn not in aliases:
|
||
|
|
aliases.insert(0, cn)
|
||
|
|
fm["canonical_name"] = new_name
|
||
|
|
fm["aliases"] = aliases
|
||
|
|
|
||
|
|
# Mutate frontmatter — preserve unrelated keys.
|
||
|
|
fm["mentioned_in"] = [f"[[{p}]]" for p in page_list]
|
||
|
|
fm["total_mentions"] = max(db_chunks, len(page_list))
|
||
|
|
fm["documents_count"] = max(db_docs, len({p.split("/", 1)[0] for p in page_list}))
|
||
|
|
fm["signal_sources"] = {
|
||
|
|
"db_chunks": int(db_chunks),
|
||
|
|
"page_refs": len(page_list),
|
||
|
|
"cross_refs": len(all_cross),
|
||
|
|
}
|
||
|
|
if all_cross:
|
||
|
|
fm["referenced_by"] = [f"[[{r}]]" for r in all_cross[:25]]
|
||
|
|
elif "referenced_by" in fm:
|
||
|
|
del fm["referenced_by"]
|
||
|
|
fm["signal_strength"] = strength
|
||
|
|
fm["last_lint"] = utc_iso()
|
||
|
|
|
||
|
|
# Optional archive paths — by default we KEEP everything, only mark.
|
||
|
|
if strength == "orphan" and args.archive:
|
||
|
|
archive_entity(ent_path, args.dry_run, archived_count)
|
||
|
|
continue
|
||
|
|
if args.archive_only_junk:
|
||
|
|
cn = str(fm.get("canonical_name") or "").strip()
|
||
|
|
cn_id = cn.lower()
|
||
|
|
is_junk = (
|
||
|
|
len(cn) <= 3
|
||
|
|
or re.fullmatch(r"[0-9.()-]+", cn) is not None
|
||
|
|
or cn_id in {"unknown", "none", "n/a", "na", "-", "—"}
|
||
|
|
)
|
||
|
|
if is_junk and strength == "orphan":
|
||
|
|
archive_entity(ent_path, args.dry_run, archived_count)
|
||
|
|
continue
|
||
|
|
|
||
|
|
stats["updated"] += 1
|
||
|
|
if args.verbose:
|
||
|
|
print(f" {strength:7} {cls}/{eid} db={db_chunks} pages={len(page_list)} cross={len(cross_list)}")
|
||
|
|
if not args.dry_run:
|
||
|
|
write_md(ent_path, fm, body)
|
||
|
|
|
||
|
|
print()
|
||
|
|
print(f" strong: {stats['strong']:>6}")
|
||
|
|
print(f" weak: {stats['weak']:>6}")
|
||
|
|
print(f" orphan: {stats['orphan']:>6} (archived: {archived_count[0]})")
|
||
|
|
print(f" updated: {stats['updated']:>6}")
|
||
|
|
print(f" skipped: {stats['skipped']:>6}")
|
||
|
|
print(f" dry-run: {args.dry_run}")
|
||
|
|
|
||
|
|
if not args.dry_run and (stats["updated"] > 0 or archived_count[0] > 0):
|
||
|
|
LOG_PATH.parent.mkdir(parents=True, exist_ok=True)
|
||
|
|
with LOG_PATH.open("a", encoding="utf-8") as f:
|
||
|
|
f.write(
|
||
|
|
f"\n## {utc_iso()} · SYNC_ENTITY_STATS\n"
|
||
|
|
f"- script: scripts/maintain/42_sync_entity_stats.py\n"
|
||
|
|
f"- strong: {stats['strong']}\n"
|
||
|
|
f"- weak: {stats['weak']}\n"
|
||
|
|
f"- orphan: {stats['orphan']} (archived: {archived_count[0]})\n"
|
||
|
|
f"- updated: {stats['updated']}\n"
|
||
|
|
)
|
||
|
|
return 0
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
sys.exit(main())
|