86 lines
2.4 KiB
Python
Executable file
86 lines
2.4 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
33-compact-progress-log.py — Compact raw/_batch-rebuild/progress.jsonl by
|
|
keeping ONLY the latest entry per doc_id.
|
|
|
|
Useful after multiple resume runs: instead of 200 rows for 115 docs (retries
|
|
included), get back to 115 (or fewer) — one per doc with the latest outcome.
|
|
|
|
Idempotent + safe: writes to a temp file, atomically moves on success, keeps
|
|
the prior version as `.bak`.
|
|
|
|
Usage:
|
|
./33-compact-progress-log.py # compact in place
|
|
./33-compact-progress-log.py --dry-run # show what would change
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import sys
|
|
import time
|
|
from pathlib import Path
|
|
|
|
|
|
UFO_ROOT = Path("/Users/guto/ufo")
|
|
LOG_DIR = UFO_ROOT / "raw" / "_batch-rebuild"
|
|
|
|
|
|
def compact_file(path: Path, *, dry_run: bool) -> tuple[int, int]:
|
|
if not path.exists():
|
|
return (0, 0)
|
|
rows: list[dict] = []
|
|
with path.open("r", encoding="utf-8") as fh:
|
|
for line in fh:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
rows.append(json.loads(line))
|
|
except json.JSONDecodeError:
|
|
continue
|
|
before = len(rows)
|
|
|
|
# Keep last row per doc_id (preserves insertion order via dict)
|
|
latest: dict[str, dict] = {}
|
|
for r in rows:
|
|
doc = r.get("doc_id")
|
|
if doc:
|
|
latest[doc] = r
|
|
kept = list(latest.values())
|
|
after = len(kept)
|
|
|
|
if dry_run:
|
|
return (before, after)
|
|
|
|
# Write atomically
|
|
tmp = path.with_suffix(path.suffix + ".tmp")
|
|
with tmp.open("w", encoding="utf-8") as fh:
|
|
for r in kept:
|
|
fh.write(json.dumps(r, ensure_ascii=False) + "\n")
|
|
|
|
bak = path.with_suffix(path.suffix + f".bak-{int(time.time())}")
|
|
if path.exists():
|
|
os.replace(path, bak)
|
|
os.replace(tmp, path)
|
|
return (before, after)
|
|
|
|
|
|
def main():
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("--dry-run", action="store_true")
|
|
args = ap.parse_args()
|
|
|
|
for name in ("progress.jsonl", "failed.jsonl"):
|
|
p = LOG_DIR / name
|
|
if not p.exists():
|
|
print(f" ⊘ {name}: not present")
|
|
continue
|
|
before, after = compact_file(p, dry_run=args.dry_run)
|
|
verb = "would write" if args.dry_run else "wrote"
|
|
print(f" ✓ {name}: {before} rows → {after} unique doc_ids ({verb})")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|