252 lines
8.4 KiB
Python
Executable file
252 lines
8.4 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
05-crop-bboxes.py — Eager crop generation from bounding boxes
|
|
|
|
For each page.md, read `images_detected[]`, `tables_detected[]`, and
|
|
`signatures_observed[]` (the elements whose visual content is worth showing
|
|
inline in chat replies). For each element with a bbox, crop the corresponding
|
|
region from the page PNG using Pillow and save to:
|
|
|
|
processing/crops/<doc-id>/<crop-id>.png
|
|
|
|
Where <crop-id> follows the convention:
|
|
IMG-<DOC>-p<NNN>-<NN> for images_detected
|
|
TBL-<DOC>-p<NNN>-<NN> for tables_detected
|
|
SIG-<DOC>-p<NNN>-<NN> for signatures_observed
|
|
|
|
Padding: 1% of page dimensions around each bbox to avoid tight clipping.
|
|
|
|
Idempotent: skips crops whose output PNG already exists with non-zero size
|
|
(unless --force).
|
|
|
|
Usage:
|
|
./05-crop-bboxes.py # all docs
|
|
./05-crop-bboxes.py --doc-id <id> # single doc
|
|
./05-crop-bboxes.py --force # overwrite existing crops
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import re
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
try:
|
|
import yaml
|
|
except ImportError:
|
|
sys.stderr.write("Missing pyyaml. Run: pip3 install pyyaml\n")
|
|
sys.exit(1)
|
|
|
|
try:
|
|
from PIL import Image
|
|
except ImportError:
|
|
sys.stderr.write("Missing pillow. Run: pip3 install pillow\n")
|
|
sys.exit(1)
|
|
|
|
|
|
UFO_ROOT = Path("/Users/guto/ufo")
|
|
PAGES_BASE = UFO_ROOT / "wiki" / "pages"
|
|
PNG_BASE = UFO_ROOT / "processing" / "png"
|
|
CROPS_BASE = UFO_ROOT / "processing" / "crops"
|
|
LOG_PATH = UFO_ROOT / "wiki" / "log.md"
|
|
|
|
PADDING_FRACTION = 0.01 # 1% padding around bbox
|
|
|
|
|
|
def utc_now_iso() -> str:
|
|
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
|
|
|
|
def read_frontmatter(path: Path) -> dict:
|
|
c = path.read_text(encoding="utf-8")
|
|
if not c.startswith("---"):
|
|
return {}
|
|
end = c.find("---", 4)
|
|
if end == -1:
|
|
return {}
|
|
try:
|
|
return yaml.safe_load(c[3:end].strip()) or {}
|
|
except yaml.YAMLError:
|
|
return {}
|
|
|
|
|
|
def doc_id_short(doc_id: str) -> str:
|
|
"""Compact uppercase tag for use inside crop ids.
|
|
'dow-uap-d54-mission-report-mediterranean-sea-na' → 'DOWD54'
|
|
'doc-65-hs1-...' → 'D65HS1' (drop common prefixes, keep first signal)
|
|
"""
|
|
s = doc_id.upper()
|
|
# Remove common prefixes / fillers
|
|
for prefix in ("DOW-UAP-", "DOS-UAP-", "NASA-UAP-", "FBI-PHOTO-", "DOC-"):
|
|
if s.startswith(prefix):
|
|
s = s[len(prefix):]
|
|
break
|
|
# Take first ~6 alphanumeric chars
|
|
s = re.sub(r"[^A-Z0-9]", "", s)[:8]
|
|
return s or "X"
|
|
|
|
|
|
def make_crop_id(prefix: str, doc_id: str, page_num: int, idx: int) -> str:
|
|
return f"{prefix}-{doc_id_short(doc_id)}-p{page_num:03d}-{idx:02d}"
|
|
|
|
|
|
def crop_bbox(
|
|
*,
|
|
src_png: Path,
|
|
dest_png: Path,
|
|
bbox: dict,
|
|
padding: float,
|
|
force: bool,
|
|
) -> tuple[bool, str | None]:
|
|
"""Crop src_png by bbox and save to dest_png.
|
|
Returns (created, reason_skipped_or_error).
|
|
"""
|
|
if not force and dest_png.exists() and dest_png.stat().st_size > 0:
|
|
return (False, "exists")
|
|
try:
|
|
with Image.open(src_png) as im:
|
|
W, H = im.size
|
|
x = float(bbox.get("x", 0))
|
|
y = float(bbox.get("y", 0))
|
|
w = float(bbox.get("w", 0))
|
|
h = float(bbox.get("h", 0))
|
|
if w <= 0 or h <= 0:
|
|
return (False, "zero-size-bbox")
|
|
# Apply padding
|
|
x_pad = max(0.0, x - padding)
|
|
y_pad = max(0.0, y - padding)
|
|
w_pad = min(1.0 - x_pad, w + 2 * padding)
|
|
h_pad = min(1.0 - y_pad, h + 2 * padding)
|
|
# Pixel coords
|
|
px = int(round(x_pad * W))
|
|
py = int(round(y_pad * H))
|
|
pw = max(1, int(round(w_pad * W)))
|
|
ph = max(1, int(round(h_pad * H)))
|
|
crop = im.crop((px, py, px + pw, py + ph))
|
|
dest_png.parent.mkdir(parents=True, exist_ok=True)
|
|
crop.save(dest_png, "PNG", optimize=True)
|
|
return (True, None)
|
|
except Exception as e:
|
|
return (False, f"error: {e}")
|
|
|
|
|
|
def process_page(page_md: Path, force: bool) -> dict:
|
|
"""Returns counts {created, skipped, error} for this page."""
|
|
fm = read_frontmatter(page_md)
|
|
if not fm or fm.get("type") != "page":
|
|
return {"created": 0, "skipped": 0, "error": 0}
|
|
|
|
doc_id = fm.get("doc_id", "")
|
|
page_id = fm.get("page_id", "")
|
|
png_rel = fm.get("png_path", "")
|
|
if not doc_id or not page_id or not png_rel:
|
|
return {"created": 0, "skipped": 0, "error": 0}
|
|
|
|
src_png = (page_md.parent / png_rel).resolve()
|
|
if not src_png.exists():
|
|
sys.stderr.write(f" ✗ source PNG missing: {src_png}\n")
|
|
return {"created": 0, "skipped": 0, "error": 1}
|
|
|
|
page_num = int(fm.get("page_number", 0))
|
|
counts = {"created": 0, "skipped": 0, "error": 0}
|
|
|
|
# 1. Images detected
|
|
for idx, item in enumerate(fm.get("images_detected") or [], start=1):
|
|
bbox = item.get("bbox")
|
|
if not bbox:
|
|
continue
|
|
crop_id = make_crop_id("IMG", doc_id, page_num, idx)
|
|
dest = CROPS_BASE / doc_id / f"{crop_id}.png"
|
|
created, reason = crop_bbox(
|
|
src_png=src_png, dest_png=dest, bbox=bbox,
|
|
padding=PADDING_FRACTION, force=force,
|
|
)
|
|
if created:
|
|
counts["created"] += 1
|
|
elif reason == "exists":
|
|
counts["skipped"] += 1
|
|
else:
|
|
counts["error"] += 1
|
|
|
|
# 2. Tables detected
|
|
for idx, item in enumerate(fm.get("tables_detected") or [], start=1):
|
|
bbox = item.get("bbox")
|
|
if not bbox:
|
|
continue
|
|
crop_id = make_crop_id("TBL", doc_id, page_num, idx)
|
|
dest = CROPS_BASE / doc_id / f"{crop_id}.png"
|
|
created, reason = crop_bbox(
|
|
src_png=src_png, dest_png=dest, bbox=bbox,
|
|
padding=PADDING_FRACTION * 2, force=force,
|
|
)
|
|
if created:
|
|
counts["created"] += 1
|
|
elif reason == "exists":
|
|
counts["skipped"] += 1
|
|
else:
|
|
counts["error"] += 1
|
|
|
|
# 3. Signatures observed
|
|
for idx, item in enumerate(fm.get("signatures_observed") or [], start=1):
|
|
bbox = item.get("bbox")
|
|
if not bbox:
|
|
continue
|
|
crop_id = make_crop_id("SIG", doc_id, page_num, idx)
|
|
dest = CROPS_BASE / doc_id / f"{crop_id}.png"
|
|
created, reason = crop_bbox(
|
|
src_png=src_png, dest_png=dest, bbox=bbox,
|
|
padding=PADDING_FRACTION, force=force,
|
|
)
|
|
if created:
|
|
counts["created"] += 1
|
|
elif reason == "exists":
|
|
counts["skipped"] += 1
|
|
else:
|
|
counts["error"] += 1
|
|
|
|
return counts
|
|
|
|
|
|
def main():
|
|
ap = argparse.ArgumentParser(description="Eager-crop bounding boxes from page PNGs.")
|
|
g = ap.add_mutually_exclusive_group()
|
|
g.add_argument("--doc-id", help="restrict to a single doc_id")
|
|
g.add_argument("--all", action="store_true", help="process all pages (default)")
|
|
ap.add_argument("--force", action="store_true", help="overwrite existing crops")
|
|
args = ap.parse_args()
|
|
|
|
if args.doc_id:
|
|
glob = PAGES_BASE / args.doc_id / "*.md"
|
|
pages = sorted(Path(str(glob).rsplit("/", 1)[0]).glob("*.md"))
|
|
else:
|
|
pages = sorted(PAGES_BASE.rglob("*.md"))
|
|
|
|
totals = {"created": 0, "skipped": 0, "error": 0}
|
|
doc_summary: dict[str, dict] = {}
|
|
|
|
print(f"Cropping bboxes from {len(pages)} page(s)...", flush=True)
|
|
for page_md in pages:
|
|
c = process_page(page_md, args.force)
|
|
doc_id = page_md.parent.name
|
|
doc_summary.setdefault(doc_id, {"created": 0, "skipped": 0, "error": 0})
|
|
for k in c:
|
|
totals[k] += c[k]
|
|
doc_summary[doc_id][k] += c[k]
|
|
|
|
for doc_id, c in sorted(doc_summary.items()):
|
|
if c["created"] or c["error"]:
|
|
print(f" {doc_id}: created={c['created']} skipped={c['skipped']} error={c['error']}", flush=True)
|
|
|
|
print(f"\nTotal: created={totals['created']}, skipped={totals['skipped']}, error={totals['error']}", flush=True)
|
|
|
|
if totals["created"] > 0 or totals["error"] > 0:
|
|
with open(LOG_PATH, "a", encoding="utf-8") as fh:
|
|
fh.write(f"\n## {utc_now_iso()} — CROP BBOXES (Phase 5.5)\n")
|
|
fh.write(f"- operator: archivist\n- script: scripts/05-crop-bboxes.py\n")
|
|
fh.write(f"- target: {args.doc_id or '(all)'}\n")
|
|
fh.write(f"- created: {totals['created']}\n- skipped: {totals['skipped']}\n- error: {totals['error']}\n")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|