234 lines
8 KiB
Python
Executable file
234 lines
8 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
09-extract-uap-frames.py — Extract key UAP frames from videos via ffmpeg
|
|
|
|
For each video analyzed by 08-video-analysis.py, read the Gemini JSON output
|
|
and extract still frames at the moments where the UAP is visible:
|
|
- first_visible_at (UAP enters frame)
|
|
- midpoint (visual peak)
|
|
- last_visible_at (UAP exits frame)
|
|
- additional samples every 1s within the visible window
|
|
|
|
Frames are written to /Users/guto/ufo/processing/uap-frames/<video-id>/
|
|
as JPEG at high quality (q=2). Filenames encode the timestamp:
|
|
frame-00-00-first.jpg
|
|
frame-00-02-mid.jpg
|
|
frame-00-04-last.jpg
|
|
frame-00-01-sample.jpg
|
|
...
|
|
|
|
The frame paths are appended back to the video's frontmatter under
|
|
`uap_frames` for traceability.
|
|
|
|
Usage:
|
|
./09-extract-uap-frames.py # all analyzed videos
|
|
./09-extract-uap-frames.py --video-id dod-111689005 # single video
|
|
./09-extract-uap-frames.py --force # re-extract
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
try:
|
|
import yaml
|
|
except ImportError:
|
|
sys.stderr.write("Missing pyyaml. Run: pip3 install pyyaml\n")
|
|
sys.exit(1)
|
|
|
|
|
|
UFO_ROOT = Path("/Users/guto/ufo")
|
|
VIDEOS_DIR = UFO_ROOT / "raw" / "videos"
|
|
ANALYSIS_DIR = UFO_ROOT / "processing" / "video-analysis"
|
|
FRAMES_DIR = UFO_ROOT / "processing" / "uap-frames"
|
|
WIKI_VIDEOS_DIR = UFO_ROOT / "wiki" / "videos"
|
|
|
|
|
|
def parse_timestamp(ts: str) -> float | None:
|
|
"""Parse 'mm:ss' or 'h:mm:ss' or 'ss' into seconds (float)."""
|
|
if not ts:
|
|
return None
|
|
ts = ts.strip()
|
|
parts = ts.split(":")
|
|
try:
|
|
if len(parts) == 1:
|
|
return float(parts[0])
|
|
if len(parts) == 2:
|
|
return int(parts[0]) * 60 + float(parts[1])
|
|
if len(parts) == 3:
|
|
return int(parts[0]) * 3600 + int(parts[1]) * 60 + float(parts[2])
|
|
except ValueError:
|
|
return None
|
|
return None
|
|
|
|
|
|
def find_video_path(video_id: str) -> Path | None:
|
|
"""Map video_id back to the original .mp4 in raw/videos/."""
|
|
# Try a few derivations
|
|
for v in VIDEOS_DIR.glob("*.mp4"):
|
|
stem = v.stem
|
|
# video_id is lowercase kebab; raw is uppercase with underscores
|
|
normalized = re.sub(r"[^a-z0-9]+", "-", stem.lower()).strip("-")
|
|
if normalized == video_id or f"vid-{normalized}" == video_id:
|
|
return v
|
|
return None
|
|
|
|
|
|
def extract_frame(video_path: Path, timestamp_s: float, out_path: Path) -> bool:
|
|
"""Extract a single JPEG frame at the given timestamp using ffmpeg."""
|
|
out_path.parent.mkdir(parents=True, exist_ok=True)
|
|
cmd = [
|
|
"ffmpeg",
|
|
"-y", # overwrite
|
|
"-loglevel", "error",
|
|
"-ss", f"{timestamp_s:.3f}",
|
|
"-i", str(video_path),
|
|
"-frames:v", "1",
|
|
"-q:v", "2", # high quality JPEG
|
|
str(out_path),
|
|
]
|
|
res = subprocess.run(cmd, capture_output=True, text=True, check=False)
|
|
if res.returncode != 0 or not out_path.exists() or out_path.stat().st_size == 0:
|
|
sys.stderr.write(f" ✗ ffmpeg failed for {timestamp_s:.2f}s: {res.stderr[:200]}\n")
|
|
return False
|
|
return True
|
|
|
|
|
|
def collect_extraction_points(analysis: dict) -> list[tuple[float, str]]:
|
|
"""Return list of (timestamp_seconds, label) to extract."""
|
|
uap = analysis.get("uap_observation_fields") or {}
|
|
first = parse_timestamp(uap.get("first_visible_at", ""))
|
|
last = parse_timestamp(uap.get("last_visible_at", ""))
|
|
overview = analysis.get("video_overview") or {}
|
|
duration = overview.get("duration_seconds") or 0
|
|
|
|
points: list[tuple[float, str]] = []
|
|
|
|
if first is None and last is None:
|
|
# No UAP timestamps — sample evenly
|
|
if duration > 0:
|
|
for i in range(min(5, int(duration) + 1)):
|
|
t = duration * (i + 0.5) / 5
|
|
points.append((t, f"sample-{i:02d}"))
|
|
return points
|
|
|
|
first = first if first is not None else 0.0
|
|
last = last if last is not None else first + 1.0
|
|
if last <= first:
|
|
last = first + 0.5
|
|
|
|
# Always include first, mid, last
|
|
points.append((first, "first"))
|
|
mid = (first + last) / 2
|
|
if abs(mid - first) > 0.4:
|
|
points.append((mid, "mid"))
|
|
if last - first > 0.6:
|
|
points.append((last, "last"))
|
|
|
|
# Sample every ~1s within window
|
|
cur = first + 1.0
|
|
sample_idx = 0
|
|
while cur < last - 0.2:
|
|
# avoid duplicating mid
|
|
if abs(cur - mid) > 0.5:
|
|
points.append((cur, f"sample-{sample_idx:02d}"))
|
|
sample_idx += 1
|
|
cur += 1.0
|
|
|
|
return points
|
|
|
|
|
|
def format_filename(t: float, label: str) -> str:
|
|
"""frame-MM-SS[-fff]-label.jpg"""
|
|
mm = int(t // 60)
|
|
ss = t - mm * 60
|
|
return f"frame-{mm:02d}-{ss:05.2f}-{label}.jpg".replace(".", "_", 1).replace(".jpg", "", 1)[:-1] + ".jpg"
|
|
|
|
|
|
def process_video(video_id: str, force: bool = False) -> dict:
|
|
json_path = ANALYSIS_DIR / f"{video_id}.json"
|
|
if not json_path.exists():
|
|
return {"video_id": video_id, "status": "no-analysis", "frames": []}
|
|
data = json.loads(json_path.read_text(encoding="utf-8"))
|
|
analysis = data.get("analysis", {})
|
|
|
|
video_path = find_video_path(video_id)
|
|
if not video_path:
|
|
return {"video_id": video_id, "status": "no-source-video", "frames": []}
|
|
|
|
frames_subdir = FRAMES_DIR / video_id
|
|
if frames_subdir.exists() and not force and any(frames_subdir.glob("*.jpg")):
|
|
existing = [str(p.relative_to(UFO_ROOT)) for p in sorted(frames_subdir.glob("*.jpg"))]
|
|
return {"video_id": video_id, "status": "skipped-existing", "frames": existing}
|
|
|
|
points = collect_extraction_points(analysis)
|
|
if not points:
|
|
return {"video_id": video_id, "status": "no-extraction-points", "frames": []}
|
|
|
|
frames_subdir.mkdir(parents=True, exist_ok=True)
|
|
extracted = []
|
|
for t, label in points:
|
|
fname = format_filename(t, label)
|
|
out = frames_subdir / fname
|
|
if extract_frame(video_path, t, out):
|
|
extracted.append(str(out.relative_to(UFO_ROOT)))
|
|
print(f" ✓ {video_id} @ {t:6.2f}s [{label:8}] → {fname}", flush=True)
|
|
|
|
return {"video_id": video_id, "status": "ok", "frames": extracted}
|
|
|
|
|
|
def append_frames_to_md(video_id: str, frames: list[str]):
|
|
"""Add `uap_frames` list to the wiki/videos/<video-id>.md frontmatter."""
|
|
md_path = WIKI_VIDEOS_DIR / f"{video_id}.md"
|
|
if not md_path.exists():
|
|
return
|
|
content = md_path.read_text(encoding="utf-8")
|
|
if not content.startswith("---"):
|
|
return
|
|
end = content.find("---", 4)
|
|
if end == -1:
|
|
return
|
|
try:
|
|
fm = yaml.safe_load(content[3:end].strip()) or {}
|
|
except yaml.YAMLError:
|
|
return
|
|
body = content[end + 3 :].lstrip("\n")
|
|
|
|
fm["uap_frames"] = frames
|
|
fm["uap_frames_extracted_at"] = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
|
|
yaml_str = yaml.dump(fm, allow_unicode=True, sort_keys=False, default_flow_style=False)
|
|
new = f"---\n{yaml_str}---\n\n{body}"
|
|
md_path.write_text(new, encoding="utf-8")
|
|
|
|
|
|
def main():
|
|
ap = argparse.ArgumentParser(description="Extract UAP frames from analyzed videos via ffmpeg.")
|
|
g = ap.add_mutually_exclusive_group()
|
|
g.add_argument("--video-id", help="single video id (e.g. dod-111689005)")
|
|
g.add_argument("--all", action="store_true", help="all analyzed videos (default)")
|
|
ap.add_argument("--force", action="store_true", help="re-extract even if frames exist")
|
|
args = ap.parse_args()
|
|
|
|
if args.video_id:
|
|
targets = [args.video_id]
|
|
else:
|
|
targets = sorted(p.stem for p in ANALYSIS_DIR.glob("*.json"))
|
|
|
|
print(f"Processing {len(targets)} video(s)…")
|
|
for vid in targets:
|
|
res = process_video(vid, force=args.force)
|
|
if res["status"] == "ok":
|
|
append_frames_to_md(vid, res["frames"])
|
|
print(f" → {vid}: {len(res['frames'])} frames extracted, md updated")
|
|
else:
|
|
print(f" → {vid}: {res['status']}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|