#!/usr/bin/env python3 """ 12-incremental-orchestrator.py — Wait-and-process loop Polls for newly-completed Gemini video analyses, then runs the downstream steps (frame extraction + case-images generation) per video. Idempotent by construction — each step checks the output and skips if already done. A video is "ready for downstream" when BOTH exist: - processing/video-analysis/.json (Gemini analysis output) - wiki/videos/.md (markdown rendered by script 08) For each ready video, this loop will: 1. If no `processing/uap-frames//*.jpg` exists → run 09-extract-uap-frames.py 2. If no `processing/case-images//case-nanobanana.png` AND no `processing/case-images//case-codex.png` → run 11-generate-case-images.py (steps 1 and 2 always check their own outputs first; they never re-do work) Termination: - Stops when all videos in raw/videos/ are fully downstream-processed - Or when --max-iterations is reached - Or on SIGINT (Ctrl+C) Usage: ./12-incremental-orchestrator.py # poll every 90s ./12-incremental-orchestrator.py --interval 60 # custom poll interval ./12-incremental-orchestrator.py --max-iterations 50 ./12-incremental-orchestrator.py --skip-codex # only Nano Banana case images ./12-incremental-orchestrator.py --once # single pass, no loop """ from __future__ import annotations import argparse import re import subprocess import sys import time import unicodedata from datetime import datetime, timezone from pathlib import Path UFO_ROOT = Path("/Users/guto/ufo") VIDEOS_DIR = UFO_ROOT / "raw" / "videos" ANALYSIS_DIR = UFO_ROOT / "processing" / "video-analysis" WIKI_VIDEOS_DIR = UFO_ROOT / "wiki" / "videos" FRAMES_DIR = UFO_ROOT / "processing" / "uap-frames" CASE_IMAGES_DIR = UFO_ROOT / "processing" / "case-images" SCRIPTS = UFO_ROOT / "scripts" LOG_PATH = UFO_ROOT / "wiki" / "log.md" def now() -> str: return datetime.now(timezone.utc).strftime("%H:%M:%S") def filename_to_video_id(name: str) -> str: base = name.rsplit(".", 1)[0] nfkd = unicodedata.normalize("NFKD", base) ascii_s = "".join(c for c in nfkd if not unicodedata.combining(c)) lower = ascii_s.lower() replaced = re.sub(r"[^a-z0-9-]", "-", lower) collapsed = re.sub(r"-+", "-", replaced).strip("-") if collapsed and collapsed[0].isdigit(): collapsed = "vid-" + collapsed return collapsed def list_all_video_ids() -> list[str]: """All videos under raw/videos/ mapped to their canonical ids.""" return sorted(filename_to_video_id(p.name) for p in VIDEOS_DIR.glob("*.mp4")) def is_analyzed(video_id: str) -> bool: """Ready for downstream: has both JSON and MD from script 08.""" return (ANALYSIS_DIR / f"{video_id}.json").exists() and (WIKI_VIDEOS_DIR / f"{video_id}.md").exists() def has_frames(video_id: str) -> bool: d = FRAMES_DIR / video_id return d.exists() and any(d.glob("*.jpg")) def has_case_images(video_id: str, want_codex: bool) -> bool: d = CASE_IMAGES_DIR / video_id nano = d / "case-nanobanana.png" codex = d / "case-codex.png" if not nano.exists(): return False if want_codex and not codex.exists(): return False return True def run(cmd: list[str], description: str) -> bool: """Run a subprocess, streaming through. Returns success.""" print(f" [{now()}] → {description}", flush=True) try: res = subprocess.run(cmd, timeout=900, check=False) return res.returncode == 0 except subprocess.TimeoutExpired: print(f" [{now()}] ✗ timeout on {description}", flush=True) return False def process_one_pass(skip_codex: bool, skip_nano: bool) -> tuple[int, int, int]: """Single pass over all video ids. Returns (newly_processed, total_ready, total_videos).""" all_ids = list_all_video_ids() ready_ids = [v for v in all_ids if is_analyzed(v)] actions_done = 0 for vid in ready_ids: did_anything = False # Step 1: frames if not has_frames(vid): cmd = ["python3", str(SCRIPTS / "09-extract-uap-frames.py"), "--video-id", vid] if run(cmd, f"frames for {vid}"): did_anything = True else: continue # don't proceed to case images if frames failed # Step 2: case images if not has_case_images(vid, want_codex=not skip_codex): cmd = ["python3", str(SCRIPTS / "11-generate-case-images.py"), "--kind", "videos", "--entity-id", vid] if skip_codex: cmd.append("--skip-codex") if skip_nano: cmd.append("--skip-nano") if run(cmd, f"case images for {vid}"): did_anything = True if did_anything: actions_done += 1 return actions_done, len(ready_ids), len(all_ids) def all_fully_processed(skip_codex: bool) -> bool: """True when every video has been Gemini-analyzed AND has frames + case images.""" all_ids = list_all_video_ids() if not all_ids: return False for v in all_ids: if not is_analyzed(v): return False if not has_frames(v): return False if not has_case_images(v, want_codex=not skip_codex): return False return True def main(): ap = argparse.ArgumentParser() ap.add_argument("--interval", type=int, default=90, help="poll interval seconds (default 90)") ap.add_argument("--max-iterations", type=int, default=120, help="cap (120 × 90s = 3h)") ap.add_argument("--skip-codex", action="store_true") ap.add_argument("--skip-nano", action="store_true") ap.add_argument("--once", action="store_true", help="single pass, no loop") args = ap.parse_args() print(f"[{now()}] orchestrator started") print(f" interval={args.interval}s max_iterations={args.max_iterations}") print(f" skip_codex={args.skip_codex} skip_nano={args.skip_nano}") print(f" watching: {len(list_all_video_ids())} videos in raw/videos/") iteration = 0 total_actions = 0 try: while iteration < args.max_iterations: iteration += 1 actions, ready, total = process_one_pass(args.skip_codex, args.skip_nano) total_actions += actions print(f"[{now()}] iter {iteration}: ready={ready}/{total}, " f"actions_this_pass={actions}, total_actions={total_actions}", flush=True) if args.once: break if all_fully_processed(args.skip_codex): print(f"[{now()}] ✓ all {total} videos fully processed — exiting", flush=True) break time.sleep(args.interval) except KeyboardInterrupt: print(f"\n[{now()}] interrupted by user") # Log with open(LOG_PATH, "a", encoding="utf-8") as fh: fh.write( f"\n## {datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')} — ORCHESTRATOR\n" f"- operator: archivist\n- script: scripts/12-incremental-orchestrator.py\n" f"- iterations: {iteration}\n- total_actions: {total_actions}\n" ) if __name__ == "__main__": main()