#!/usr/bin/env python3 """ 09-extract-uap-frames.py — Extract key UAP frames from videos via ffmpeg For each video analyzed by 08-video-analysis.py, read the Gemini JSON output and extract still frames at the moments where the UAP is visible: - first_visible_at (UAP enters frame) - midpoint (visual peak) - last_visible_at (UAP exits frame) - additional samples every 1s within the visible window Frames are written to /Users/guto/ufo/processing/uap-frames// as JPEG at high quality (q=2). Filenames encode the timestamp: frame-00-00-first.jpg frame-00-02-mid.jpg frame-00-04-last.jpg frame-00-01-sample.jpg ... The frame paths are appended back to the video's frontmatter under `uap_frames` for traceability. Usage: ./09-extract-uap-frames.py # all analyzed videos ./09-extract-uap-frames.py --video-id dod-111689005 # single video ./09-extract-uap-frames.py --force # re-extract """ from __future__ import annotations import argparse import json import re import subprocess import sys from datetime import datetime, timezone from pathlib import Path try: import yaml except ImportError: sys.stderr.write("Missing pyyaml. Run: pip3 install pyyaml\n") sys.exit(1) UFO_ROOT = Path("/Users/guto/ufo") VIDEOS_DIR = UFO_ROOT / "raw" / "videos" ANALYSIS_DIR = UFO_ROOT / "processing" / "video-analysis" FRAMES_DIR = UFO_ROOT / "processing" / "uap-frames" WIKI_VIDEOS_DIR = UFO_ROOT / "wiki" / "videos" def parse_timestamp(ts: str) -> float | None: """Parse 'mm:ss' or 'h:mm:ss' or 'ss' into seconds (float).""" if not ts: return None ts = ts.strip() parts = ts.split(":") try: if len(parts) == 1: return float(parts[0]) if len(parts) == 2: return int(parts[0]) * 60 + float(parts[1]) if len(parts) == 3: return int(parts[0]) * 3600 + int(parts[1]) * 60 + float(parts[2]) except ValueError: return None return None def find_video_path(video_id: str) -> Path | None: """Map video_id back to the original .mp4 in raw/videos/.""" # Try a few derivations for v in VIDEOS_DIR.glob("*.mp4"): stem = v.stem # video_id is lowercase kebab; raw is uppercase with underscores normalized = re.sub(r"[^a-z0-9]+", "-", stem.lower()).strip("-") if normalized == video_id or f"vid-{normalized}" == video_id: return v return None def extract_frame(video_path: Path, timestamp_s: float, out_path: Path) -> bool: """Extract a single JPEG frame at the given timestamp using ffmpeg.""" out_path.parent.mkdir(parents=True, exist_ok=True) cmd = [ "ffmpeg", "-y", # overwrite "-loglevel", "error", "-ss", f"{timestamp_s:.3f}", "-i", str(video_path), "-frames:v", "1", "-q:v", "2", # high quality JPEG str(out_path), ] res = subprocess.run(cmd, capture_output=True, text=True, check=False) if res.returncode != 0 or not out_path.exists() or out_path.stat().st_size == 0: sys.stderr.write(f" ✗ ffmpeg failed for {timestamp_s:.2f}s: {res.stderr[:200]}\n") return False return True def collect_extraction_points(analysis: dict) -> list[tuple[float, str]]: """Return list of (timestamp_seconds, label) to extract.""" uap = analysis.get("uap_observation_fields") or {} first = parse_timestamp(uap.get("first_visible_at", "")) last = parse_timestamp(uap.get("last_visible_at", "")) overview = analysis.get("video_overview") or {} duration = overview.get("duration_seconds") or 0 points: list[tuple[float, str]] = [] if first is None and last is None: # No UAP timestamps — sample evenly if duration > 0: for i in range(min(5, int(duration) + 1)): t = duration * (i + 0.5) / 5 points.append((t, f"sample-{i:02d}")) return points first = first if first is not None else 0.0 last = last if last is not None else first + 1.0 if last <= first: last = first + 0.5 # Always include first, mid, last points.append((first, "first")) mid = (first + last) / 2 if abs(mid - first) > 0.4: points.append((mid, "mid")) if last - first > 0.6: points.append((last, "last")) # Sample every ~1s within window cur = first + 1.0 sample_idx = 0 while cur < last - 0.2: # avoid duplicating mid if abs(cur - mid) > 0.5: points.append((cur, f"sample-{sample_idx:02d}")) sample_idx += 1 cur += 1.0 return points def format_filename(t: float, label: str) -> str: """frame-MM-SS[-fff]-label.jpg""" mm = int(t // 60) ss = t - mm * 60 return f"frame-{mm:02d}-{ss:05.2f}-{label}.jpg".replace(".", "_", 1).replace(".jpg", "", 1)[:-1] + ".jpg" def process_video(video_id: str, force: bool = False) -> dict: json_path = ANALYSIS_DIR / f"{video_id}.json" if not json_path.exists(): return {"video_id": video_id, "status": "no-analysis", "frames": []} data = json.loads(json_path.read_text(encoding="utf-8")) analysis = data.get("analysis", {}) video_path = find_video_path(video_id) if not video_path: return {"video_id": video_id, "status": "no-source-video", "frames": []} frames_subdir = FRAMES_DIR / video_id if frames_subdir.exists() and not force and any(frames_subdir.glob("*.jpg")): existing = [str(p.relative_to(UFO_ROOT)) for p in sorted(frames_subdir.glob("*.jpg"))] return {"video_id": video_id, "status": "skipped-existing", "frames": existing} points = collect_extraction_points(analysis) if not points: return {"video_id": video_id, "status": "no-extraction-points", "frames": []} frames_subdir.mkdir(parents=True, exist_ok=True) extracted = [] for t, label in points: fname = format_filename(t, label) out = frames_subdir / fname if extract_frame(video_path, t, out): extracted.append(str(out.relative_to(UFO_ROOT))) print(f" ✓ {video_id} @ {t:6.2f}s [{label:8}] → {fname}", flush=True) return {"video_id": video_id, "status": "ok", "frames": extracted} def append_frames_to_md(video_id: str, frames: list[str]): """Add `uap_frames` list to the wiki/videos/.md frontmatter.""" md_path = WIKI_VIDEOS_DIR / f"{video_id}.md" if not md_path.exists(): return content = md_path.read_text(encoding="utf-8") if not content.startswith("---"): return end = content.find("---", 4) if end == -1: return try: fm = yaml.safe_load(content[3:end].strip()) or {} except yaml.YAMLError: return body = content[end + 3 :].lstrip("\n") fm["uap_frames"] = frames fm["uap_frames_extracted_at"] = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") yaml_str = yaml.dump(fm, allow_unicode=True, sort_keys=False, default_flow_style=False) new = f"---\n{yaml_str}---\n\n{body}" md_path.write_text(new, encoding="utf-8") def main(): ap = argparse.ArgumentParser(description="Extract UAP frames from analyzed videos via ffmpeg.") g = ap.add_mutually_exclusive_group() g.add_argument("--video-id", help="single video id (e.g. dod-111689005)") g.add_argument("--all", action="store_true", help="all analyzed videos (default)") ap.add_argument("--force", action="store_true", help="re-extract even if frames exist") args = ap.parse_args() if args.video_id: targets = [args.video_id] else: targets = sorted(p.stem for p in ANALYSIS_DIR.glob("*.json")) print(f"Processing {len(targets)} video(s)…") for vid in targets: res = process_video(vid, force=args.force) if res["status"] == "ok": append_frames_to_md(vid, res["frames"]) print(f" → {vid}: {len(res['frames'])} frames extracted, md updated") else: print(f" → {vid}: {res['status']}") if __name__ == "__main__": main()