498 lines
22 KiB
Python
Executable file
498 lines
22 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
17-enrich-entities.py — Fase 6 — Enrichment externo de entidades
|
|
|
|
Para cada entidade em wiki/entities/<class>/<id>.md:
|
|
- total_mentions >= 3 → enrichment_status: deep (WebSearch + WebFetch + >=2 sources)
|
|
- total_mentions 1-2 → enrichment_status: shallow (1 query + conhecimento interno)
|
|
- total_mentions == 0 → enrichment_status: none (skip)
|
|
|
|
Usa Claude CLI (`claude -p --model haiku`) com tools WebSearch e WebFetch,
|
|
mesmo padrão de OAuth/plano Max que 02-vision-page.py.
|
|
|
|
Pede ao modelo JSON estruturado com:
|
|
- biographical_summary EN + PT-BR
|
|
- external_sources[] (URL + título + publisher + key_facts + reliability_band)
|
|
- additional_aliases, verified_facts
|
|
- class-specific (dates pessoa, org_type, coordinates loc, etc.)
|
|
|
|
Atualiza:
|
|
- frontmatter: enrichment_status, external_sources, last_enriched, +campos específicos
|
|
- corpo: insere/atualiza seção "## Enrichment (EN)" + "## Enriquecimento (PT-BR)"
|
|
PRESERVANDO descrição original (mantém marcador `<!-- enrichment:start -->` ...
|
|
`<!-- enrichment:end -->` para idempotência)
|
|
|
|
Idempotente:
|
|
- pula se `last_enriched` < ENRICHMENT_TTL_DAYS atrás (a menos que --force)
|
|
- re-rodar não duplica seção (substitui entre marcadores)
|
|
|
|
Wrap em ThreadPoolExecutor por entidade (timeout 240s) — evita hang do CLI.
|
|
|
|
Uso:
|
|
./17-enrich-entities.py --all [--workers 3] [--force] [--max N] [--tier deep|shallow|all]
|
|
./17-enrich-entities.py --class people # apenas pessoas
|
|
./17-enrich-entities.py --entity-id j-edgar-hoover
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import concurrent.futures
|
|
import json
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
import threading
|
|
import time
|
|
from datetime import datetime, timedelta, timezone
|
|
from pathlib import Path
|
|
|
|
try:
|
|
import yaml
|
|
except ImportError:
|
|
sys.stderr.write("Missing pyyaml. pip3 install pyyaml\n")
|
|
sys.exit(1)
|
|
|
|
|
|
UFO_ROOT = Path("/Users/guto/ufo")
|
|
ENTITIES_BASE = UFO_ROOT / "wiki" / "entities"
|
|
LOG_PATH = UFO_ROOT / "wiki" / "log.md"
|
|
|
|
MODEL = "haiku"
|
|
WIKI_VERSION = "0.1.0"
|
|
ENRICHMENT_TTL_DAYS = 30
|
|
DEFAULT_WORKERS = 3
|
|
DEFAULT_TIMEOUT_S = 240
|
|
DEEP_THRESHOLD = 3 # >= 3 mentions = deep tier
|
|
|
|
ENRICH_START = "<!-- enrichment:start -->"
|
|
ENRICH_END = "<!-- enrichment:end -->"
|
|
|
|
# Class folder names under wiki/entities/
|
|
ENTITY_DIRS = ["people", "organizations", "locations", "events",
|
|
"uap-objects", "vehicles", "operations", "concepts"]
|
|
|
|
_print_lock = threading.Lock()
|
|
|
|
|
|
def safe_print(*args, **kwargs):
|
|
with _print_lock:
|
|
print(*args, **kwargs, flush=True)
|
|
|
|
|
|
def utc_now_iso() -> str:
|
|
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
|
|
|
|
def read_md(path: Path) -> tuple[dict, str]:
|
|
c = path.read_text(encoding="utf-8")
|
|
if not c.startswith("---"):
|
|
return {}, c
|
|
end = c.find("---", 4)
|
|
if end == -1:
|
|
return {}, c
|
|
try:
|
|
return (yaml.safe_load(c[3:end].strip()) or {}), c[end + 3:].lstrip("\n")
|
|
except yaml.YAMLError:
|
|
return {}, c[end + 3:].lstrip("\n")
|
|
|
|
|
|
def write_md(path: Path, fm: dict, body: str) -> bool:
|
|
yaml_str = yaml.dump(fm, allow_unicode=True, sort_keys=False, default_flow_style=False)
|
|
new = f"---\n{yaml_str}---\n\n{body}" if not body.startswith("\n") else f"---\n{yaml_str}---\n{body}"
|
|
if path.exists() and path.read_text(encoding="utf-8") == new:
|
|
return False
|
|
path.write_text(new, encoding="utf-8")
|
|
return True
|
|
|
|
|
|
def extract_json(text: str) -> dict:
|
|
"""Strip ```json fences then parse. Robust to leading/trailing junk."""
|
|
t = text.strip()
|
|
t = re.sub(r"^```(?:json)?\s*", "", t)
|
|
t = re.sub(r"\s*```$", "", t)
|
|
# Try direct
|
|
try:
|
|
return json.loads(t)
|
|
except json.JSONDecodeError:
|
|
pass
|
|
# Try to find first { ... } balanced block
|
|
start = t.find("{")
|
|
if start == -1:
|
|
raise json.JSONDecodeError("no { in response", t, 0)
|
|
depth = 0
|
|
for i in range(start, len(t)):
|
|
if t[i] == "{":
|
|
depth += 1
|
|
elif t[i] == "}":
|
|
depth -= 1
|
|
if depth == 0:
|
|
return json.loads(t[start:i + 1])
|
|
raise json.JSONDecodeError("unbalanced braces", t, 0)
|
|
|
|
|
|
def is_stale(last_enriched: str | None) -> bool:
|
|
if not last_enriched:
|
|
return True
|
|
try:
|
|
ts = datetime.strptime(last_enriched, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=timezone.utc)
|
|
except ValueError:
|
|
return True
|
|
return (datetime.now(timezone.utc) - ts) > timedelta(days=ENRICHMENT_TTL_DAYS)
|
|
|
|
|
|
def build_prompt(entity_class: str, fm: dict, tier: str) -> str:
|
|
canonical_name = fm.get("canonical_name") or fm.get("entity_id") or "?"
|
|
aliases = fm.get("aliases") or []
|
|
total_mentions = fm.get("total_mentions", 0)
|
|
|
|
# Class-specific context hints
|
|
role_hints = {
|
|
"person": "Look up biographical info, role, organization, dates of activity. Distinguish from people with same name (disambiguation).",
|
|
"organization": "Look up organization type, founding date, country, mission, leadership. Note any UAP/UFO involvement.",
|
|
"location": "Look up coordinates (decimal lat/lon), country, region, type (city/airbase/sea/etc.), notable UAP-related history if any.",
|
|
"event": "Look up historical accounts of this event — date, location, official statements, primary sources.",
|
|
"uap_object": "External enrichment usually not applicable. Mark enrichment_status: none and explain why in summary.",
|
|
"vehicle": "Look up vehicle/aircraft model, operator, specs (if applicable).",
|
|
"operation": "Look up operation type (program/task-force/exercise), agency, date range, public knowledge.",
|
|
"concept": "Look up canonical definition, legal/scientific context, related programs.",
|
|
}
|
|
class_hint = role_hints.get(entity_class, "Look up authoritative info; cite sources.")
|
|
|
|
deep_block = (
|
|
"Use the WebSearch tool with 2-4 queries to find authoritative sources "
|
|
"(Wikipedia, official government sites, peer-reviewed sources, established news outlets). "
|
|
"Use WebFetch on the 2-3 best results to extract key facts. "
|
|
"Provide >=2 distinct sources in external_sources[]."
|
|
) if tier == "deep" else (
|
|
"Use the WebSearch tool with 1 query to confirm/disambiguate. "
|
|
"Rely primarily on your own pretraining knowledge for the summary, but cite the 1 web source "
|
|
"in external_sources[] (if found). External_sources may be empty if no reliable source surfaced."
|
|
)
|
|
|
|
aliases_str = "\n".join(f" - {a}" for a in aliases[:8]) or " (none)"
|
|
|
|
prompt = f"""You are an OSINT analyst for the Investigation Bureau — enriching one entity from a US Department of War UAP/UFO archive.
|
|
|
|
ENTITY CONTEXT:
|
|
- Class: {entity_class}
|
|
- Canonical name: {canonical_name}
|
|
- Aliases / variants in corpus:
|
|
{aliases_str}
|
|
- Total mentions across corpus: {total_mentions}
|
|
- Tier: {tier} (>= {DEEP_THRESHOLD} mentions = deep)
|
|
|
|
GUIDANCE:
|
|
{class_hint}
|
|
|
|
RESEARCH PROTOCOL:
|
|
{deep_block}
|
|
|
|
Output ONE JSON object only (no markdown fence, no commentary, no preamble). Schema:
|
|
|
|
{{
|
|
"enrichment_status": "{tier}",
|
|
"disambiguation_note": "Brief note distinguishing from similar names (e.g., 'NOT to be confused with X who is Y'). Empty string if not applicable.",
|
|
"biographical_summary_en": "3-6 sentences English. Focus on identity, role, period of activity, UAP relevance (if any). If genuinely cannot identify the entity (too generic, no public record), say so explicitly.",
|
|
"biographical_summary_pt_br": "Same content in Brazilian Portuguese (pt-br, NOT European Portuguese). Preserve UTF-8 accents (ç, ã, é, etc.). Keep proper nouns and English-language verbatim quotes in English.",
|
|
"additional_aliases": ["any alternative names, transliterations, common nicknames not already in the aliases list"],
|
|
"verified_facts": [
|
|
{{ "fact": "single verifiable claim", "source_url": "URL where it was found", "confidence_band": "high|medium|low" }}
|
|
],
|
|
"external_sources": [
|
|
{{ "url": "https://...", "title": "Page title", "publisher": "Wikipedia | NYT | DoD | etc.", "accessed_at": "{utc_now_iso()}", "key_facts": ["short fact 1", "short fact 2"], "reliability_band": "high|medium|low" }}
|
|
],
|
|
"class_specific": {{
|
|
"person": {{"dates": {{"born": "YYYY-MM-DD or null", "died": "YYYY-MM-DD or null"}}, "primary_role": "...", "primary_organization": "..."}},
|
|
"organization": {{"organization_type": "intelligence-agency|military-branch|civilian-agency|private-company|ngo|other", "country": "ISO-2 or descriptor", "founded": "YYYY or null"}},
|
|
"location": {{"coordinates": {{"lat": 0.0, "lon": 0.0}}, "location_type": "city|airbase|sea|...", "country": ["ISO-2 codes"]}},
|
|
"event": {{"date_start": "YYYY-MM-DD or YYYY or null", "primary_location": "...", "event_class": "uap-encounter|disclosure|legal-filing|other"}},
|
|
"uap_object": {{"note": "External enrichment usually not applicable for UAP objects."}},
|
|
"vehicle": {{"vehicle_class": "aircraft|ship|...", "operator": "...", "model": "..."}},
|
|
"operation": {{"operation_type": "military-operation|research-program|task-force|exercise|other", "status": "active|concluded|classified|unknown"}},
|
|
"concept": {{"concept_class": "legal-instrument|phenomenon-type|doctrine|scientific-term|jargon|program-name|other", "definition_short_en": "1 sentence", "definition_short_pt_br": "1 frase em pt-br"}}
|
|
}}
|
|
}}
|
|
|
|
Rules:
|
|
- Provide ONLY the class_specific entry for `{entity_class}`. Other class entries can be omitted.
|
|
- If the entity is impossible to identify externally (generic descriptor, common name, redacted), set `external_sources: []` and explain in `biographical_summary_en`.
|
|
- ALWAYS preserve UTF-8 accents in PT-BR. Brazilian Portuguese, NOT European.
|
|
- Output ONLY the JSON. No fence, no preamble.
|
|
"""
|
|
return prompt
|
|
|
|
|
|
def call_claude(prompt: str, timeout: int = DEFAULT_TIMEOUT_S) -> tuple[dict, dict]:
|
|
"""Invoke claude CLI with WebSearch + WebFetch. Wrapped in ThreadPoolExecutor for hard timeout."""
|
|
|
|
def _run():
|
|
cmd = [
|
|
"claude", "-p", "--model", MODEL,
|
|
"--output-format", "json",
|
|
"--max-turns", "8",
|
|
"--allowedTools", "WebSearch,WebFetch",
|
|
"--",
|
|
prompt,
|
|
]
|
|
return subprocess.run(cmd, capture_output=True, text=True, timeout=timeout + 30, check=False)
|
|
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as ex:
|
|
future = ex.submit(_run)
|
|
try:
|
|
res = future.result(timeout=timeout)
|
|
except concurrent.futures.TimeoutError:
|
|
raise RuntimeError(f"claude CLI hung > {timeout}s — aborted")
|
|
|
|
if res.returncode != 0:
|
|
raise RuntimeError(f"claude CLI rc={res.returncode}: {res.stderr[-1000:]}")
|
|
cli_out = json.loads(res.stdout)
|
|
if cli_out.get("is_error"):
|
|
raise RuntimeError(f"claude reported error: {cli_out.get('result', '')[:300]}")
|
|
enriched = extract_json(cli_out.get("result", ""))
|
|
meta = {
|
|
"duration_ms": cli_out.get("duration_ms"),
|
|
"total_cost_usd": cli_out.get("total_cost_usd"),
|
|
"num_turns": cli_out.get("num_turns"),
|
|
"session_id": cli_out.get("session_id"),
|
|
}
|
|
return enriched, meta
|
|
|
|
|
|
def merge_into_frontmatter(fm: dict, enriched: dict, tier: str, now_iso: str) -> dict:
|
|
"""Update fm in-place with enrichment results. Returns fm."""
|
|
cs = (enriched.get("class_specific") or {}).copy()
|
|
# class_specific arrives as a single-key dict in many cases; flatten it
|
|
class_specific_payload = {}
|
|
if isinstance(cs, dict):
|
|
# If it's nested {person: {...}} unwrap; otherwise treat as direct
|
|
for v in cs.values():
|
|
if isinstance(v, dict) and v:
|
|
class_specific_payload = v
|
|
break
|
|
if not class_specific_payload:
|
|
# Maybe already flat
|
|
if any(k in cs for k in ("dates", "primary_role", "organization_type", "coordinates",
|
|
"date_start", "vehicle_class", "operation_type", "concept_class")):
|
|
class_specific_payload = cs
|
|
|
|
fm["enrichment_status"] = enriched.get("enrichment_status") or tier
|
|
fm["last_enriched"] = now_iso
|
|
|
|
# external_sources (replace, not append — we want a fresh enrichment)
|
|
fm["external_sources"] = enriched.get("external_sources") or []
|
|
fm["disambiguation_note"] = enriched.get("disambiguation_note") or fm.get("disambiguation_note", "")
|
|
fm["verified_facts"] = enriched.get("verified_facts") or []
|
|
|
|
# Aliases: union
|
|
existing_aliases = set(fm.get("aliases") or [])
|
|
for a in (enriched.get("additional_aliases") or []):
|
|
if isinstance(a, str) and a.strip():
|
|
existing_aliases.add(a.strip())
|
|
fm["aliases"] = sorted(existing_aliases)
|
|
|
|
# Class-specific merges
|
|
cls = fm.get("entity_class")
|
|
if cls == "person" and class_specific_payload:
|
|
if class_specific_payload.get("dates"):
|
|
fm["dates"] = class_specific_payload["dates"]
|
|
if class_specific_payload.get("primary_role"):
|
|
fm["primary_role"] = class_specific_payload["primary_role"]
|
|
if class_specific_payload.get("primary_organization"):
|
|
fm["primary_organization"] = class_specific_payload["primary_organization"]
|
|
elif cls == "organization" and class_specific_payload:
|
|
for k in ("organization_type", "country", "founded"):
|
|
if class_specific_payload.get(k) and not fm.get(k):
|
|
fm[k] = class_specific_payload[k]
|
|
elif cls == "location" and class_specific_payload:
|
|
if class_specific_payload.get("coordinates") and not fm.get("coordinates"):
|
|
fm["coordinates"] = class_specific_payload["coordinates"]
|
|
for k in ("location_type", "country"):
|
|
if class_specific_payload.get(k) and not fm.get(k):
|
|
fm[k] = class_specific_payload[k]
|
|
elif cls == "event" and class_specific_payload:
|
|
for k in ("date_start", "primary_location", "event_class"):
|
|
v = class_specific_payload.get(k)
|
|
if v and (not fm.get(k) or fm.get(k) in ("NA", "uap-encounter", None)):
|
|
fm[k] = v
|
|
elif cls == "vehicle" and class_specific_payload:
|
|
for k in ("vehicle_class", "operator", "model"):
|
|
if class_specific_payload.get(k) and not fm.get(k):
|
|
fm[k] = class_specific_payload[k]
|
|
elif cls == "operation" and class_specific_payload:
|
|
for k in ("operation_type", "status"):
|
|
if class_specific_payload.get(k) and not fm.get(k):
|
|
fm[k] = class_specific_payload[k]
|
|
elif cls == "concept" and class_specific_payload:
|
|
if class_specific_payload.get("concept_class"):
|
|
fm["concept_class"] = class_specific_payload["concept_class"]
|
|
if class_specific_payload.get("definition_short_en"):
|
|
fm["definition_short"] = class_specific_payload["definition_short_en"]
|
|
if class_specific_payload.get("definition_short_pt_br"):
|
|
fm["definition_short_pt_br"] = class_specific_payload["definition_short_pt_br"]
|
|
|
|
return fm
|
|
|
|
|
|
def upsert_enrichment_section(body: str, enriched: dict) -> str:
|
|
"""Replace (or insert before "## Appearances in Corpus" / at end) a bilingual
|
|
enrichment section enclosed between ENRICH_START / ENRICH_END markers."""
|
|
en = (enriched.get("biographical_summary_en") or "").strip()
|
|
pt = (enriched.get("biographical_summary_pt_br") or "").strip()
|
|
disamb = (enriched.get("disambiguation_note") or "").strip()
|
|
sources = enriched.get("external_sources") or []
|
|
|
|
section_lines = [ENRICH_START, "## Enrichment (EN)", ""]
|
|
if disamb:
|
|
section_lines.extend([f"> **Disambiguation:** {disamb}", ""])
|
|
section_lines.extend([en or "_No external enrichment available._", "", "## Enriquecimento (PT-BR)", ""])
|
|
if disamb:
|
|
section_lines.extend([f"> **Desambiguação:** {disamb}", ""])
|
|
section_lines.extend([pt or "_Sem enriquecimento externo disponível._", ""])
|
|
|
|
if sources:
|
|
section_lines.extend(["## External Sources", ""])
|
|
for s in sources:
|
|
url = s.get("url", "")
|
|
title = s.get("title", "")
|
|
pub = s.get("publisher", "")
|
|
rel = s.get("reliability_band", "?")
|
|
key = "; ".join(s.get("key_facts", []) or [])
|
|
line = f"- [{title or url}]({url}) · _{pub}_ · reliability: `{rel}`"
|
|
if key:
|
|
line += f" — {key}"
|
|
section_lines.append(line)
|
|
section_lines.append("")
|
|
|
|
section_lines.append(ENRICH_END)
|
|
new_section = "\n".join(section_lines) + "\n"
|
|
|
|
# If markers exist, replace between them
|
|
if ENRICH_START in body and ENRICH_END in body:
|
|
pattern = re.compile(re.escape(ENRICH_START) + r".*?" + re.escape(ENRICH_END) + r"\n?", re.DOTALL)
|
|
return pattern.sub(new_section, body)
|
|
|
|
# Otherwise insert before "## Appearances in Corpus" if present, else append
|
|
marker = "## Appearances in Corpus"
|
|
if marker in body:
|
|
return body.replace(marker, new_section + "\n" + marker)
|
|
if not body.endswith("\n"):
|
|
body += "\n"
|
|
return body + "\n" + new_section
|
|
|
|
|
|
def list_entity_files(class_filter: str | None, entity_id_filter: str | None) -> list[Path]:
|
|
"""List entity .md paths, filtered by class and/or entity_id."""
|
|
files: list[Path] = []
|
|
dirs = [class_filter] if class_filter else ENTITY_DIRS
|
|
for d in dirs:
|
|
p = ENTITIES_BASE / d
|
|
if not p.exists():
|
|
continue
|
|
for f in sorted(p.glob("*.md")):
|
|
if entity_id_filter and f.stem != entity_id_filter:
|
|
continue
|
|
files.append(f)
|
|
return files
|
|
|
|
|
|
def tier_for(total_mentions: int) -> str:
|
|
if total_mentions >= DEEP_THRESHOLD:
|
|
return "deep"
|
|
if total_mentions >= 1:
|
|
return "shallow"
|
|
return "none"
|
|
|
|
|
|
def process_entity(path: Path, *, force: bool, tier_filter: str, timeout: int) -> tuple[str, str, float]:
|
|
"""Returns (action, tier, cost_usd)."""
|
|
fm, body = read_md(path)
|
|
if not fm:
|
|
return ("skip-no-fm", "none", 0.0)
|
|
cls = fm.get("entity_class")
|
|
if not cls:
|
|
return ("skip-no-class", "none", 0.0)
|
|
total = int(fm.get("total_mentions") or 0)
|
|
tier = tier_for(total)
|
|
if tier == "none":
|
|
return ("skip-zero", tier, 0.0)
|
|
if tier_filter != "all" and tier_filter != tier:
|
|
return ("skip-tier-filter", tier, 0.0)
|
|
if not force and not is_stale(fm.get("last_enriched")):
|
|
return ("skip-fresh", tier, 0.0)
|
|
|
|
prompt = build_prompt(cls, fm, tier)
|
|
t0 = time.time()
|
|
enriched, meta = call_claude(prompt, timeout=timeout)
|
|
dt = time.time() - t0
|
|
|
|
new_fm = merge_into_frontmatter(dict(fm), enriched, tier, utc_now_iso())
|
|
new_body = upsert_enrichment_section(body, enriched)
|
|
changed = write_md(path, new_fm, new_body)
|
|
cost = float(meta.get("total_cost_usd") or 0.0)
|
|
|
|
safe_print(f" {'✓' if changed else '·'} {path.parent.name}/{path.stem} ({tier}, {dt:.1f}s, ${cost:.4f})")
|
|
return ("written" if changed else "unchanged", tier, cost)
|
|
|
|
|
|
def main():
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("--all", action="store_true", help="enrich every entity (or use --class / --entity-id)")
|
|
ap.add_argument("--class", dest="class_filter", choices=ENTITY_DIRS, help="restrict to one class")
|
|
ap.add_argument("--entity-id", help="restrict to one entity stem (filename without .md)")
|
|
ap.add_argument("--tier", choices=["all", "deep", "shallow"], default="all")
|
|
ap.add_argument("--workers", type=int, default=DEFAULT_WORKERS)
|
|
ap.add_argument("--timeout", type=int, default=DEFAULT_TIMEOUT_S)
|
|
ap.add_argument("--force", action="store_true", help="re-enrich even if last_enriched is fresh")
|
|
ap.add_argument("--max", type=int, default=0, help="limit to N entities (0 = no limit)")
|
|
args = ap.parse_args()
|
|
|
|
if not (args.all or args.class_filter or args.entity_id):
|
|
ap.error("provide --all, --class, or --entity-id")
|
|
|
|
files = list_entity_files(args.class_filter, args.entity_id)
|
|
if args.max:
|
|
files = files[:args.max]
|
|
if not files:
|
|
print("No entities found.", file=sys.stderr)
|
|
return
|
|
|
|
print(f"Enriching {len(files)} entit(y/ies) with {args.workers} workers, tier={args.tier}, "
|
|
f"force={args.force}", flush=True)
|
|
|
|
stats = {"written": 0, "unchanged": 0, "skip-fresh": 0, "skip-tier-filter": 0,
|
|
"skip-zero": 0, "skip-no-fm": 0, "skip-no-class": 0, "errors": 0}
|
|
total_cost = 0.0
|
|
t_start = time.time()
|
|
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=args.workers) as pool:
|
|
futures = {pool.submit(process_entity, p, force=args.force,
|
|
tier_filter=args.tier, timeout=args.timeout): p for p in files}
|
|
for fut in concurrent.futures.as_completed(futures):
|
|
p = futures[fut]
|
|
try:
|
|
action, _tier, cost = fut.result()
|
|
stats[action] = stats.get(action, 0) + 1
|
|
total_cost += cost
|
|
except Exception as e:
|
|
stats["errors"] += 1
|
|
safe_print(f" ✗ {p.parent.name}/{p.stem}: {type(e).__name__}: {e}")
|
|
|
|
dt = time.time() - t_start
|
|
print(f"\nDone in {dt:.0f}s. Stats: {stats} · total_cost=${total_cost:.2f}", flush=True)
|
|
|
|
if stats.get("written") or stats.get("errors"):
|
|
with open(LOG_PATH, "a", encoding="utf-8") as fh:
|
|
fh.write(
|
|
f"\n## {utc_now_iso()} — ENRICH (Phase 6)\n"
|
|
f"- operator: profiler\n- script: scripts/17-enrich-entities.py\n"
|
|
f"- tier_filter: {args.tier}\n- workers: {args.workers}\n"
|
|
f"- written: {stats.get('written', 0)}\n"
|
|
f"- unchanged: {stats.get('unchanged', 0)}\n"
|
|
f"- skipped_fresh: {stats.get('skip-fresh', 0)}\n"
|
|
f"- errors: {stats.get('errors', 0)}\n"
|
|
f"- total_cost_usd: {total_cost:.4f}\n"
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|