disclosure-bureau/scripts/33-compact-progress-log.py

86 lines
2.4 KiB
Python
Executable file

#!/usr/bin/env python3
"""
33-compact-progress-log.py — Compact raw/_batch-rebuild/progress.jsonl by
keeping ONLY the latest entry per doc_id.
Useful after multiple resume runs: instead of 200 rows for 115 docs (retries
included), get back to 115 (or fewer) — one per doc with the latest outcome.
Idempotent + safe: writes to a temp file, atomically moves on success, keeps
the prior version as `.bak`.
Usage:
./33-compact-progress-log.py # compact in place
./33-compact-progress-log.py --dry-run # show what would change
"""
from __future__ import annotations
import argparse
import json
import os
import sys
import time
from pathlib import Path
UFO_ROOT = Path("/Users/guto/ufo")
LOG_DIR = UFO_ROOT / "raw" / "_batch-rebuild"
def compact_file(path: Path, *, dry_run: bool) -> tuple[int, int]:
if not path.exists():
return (0, 0)
rows: list[dict] = []
with path.open("r", encoding="utf-8") as fh:
for line in fh:
line = line.strip()
if not line:
continue
try:
rows.append(json.loads(line))
except json.JSONDecodeError:
continue
before = len(rows)
# Keep last row per doc_id (preserves insertion order via dict)
latest: dict[str, dict] = {}
for r in rows:
doc = r.get("doc_id")
if doc:
latest[doc] = r
kept = list(latest.values())
after = len(kept)
if dry_run:
return (before, after)
# Write atomically
tmp = path.with_suffix(path.suffix + ".tmp")
with tmp.open("w", encoding="utf-8") as fh:
for r in kept:
fh.write(json.dumps(r, ensure_ascii=False) + "\n")
bak = path.with_suffix(path.suffix + f".bak-{int(time.time())}")
if path.exists():
os.replace(path, bak)
os.replace(tmp, path)
return (before, after)
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--dry-run", action="store_true")
args = ap.parse_args()
for name in ("progress.jsonl", "failed.jsonl"):
p = LOG_DIR / name
if not p.exists():
print(f"{name}: not present")
continue
before, after = compact_file(p, dry_run=args.dry_run)
verb = "would write" if args.dry_run else "wrote"
print(f"{name}: {before} rows → {after} unique doc_ids ({verb})")
if __name__ == "__main__":
main()