201 lines
7.1 KiB
Python
Executable file
201 lines
7.1 KiB
Python
Executable file
#!/usr/bin/env python3
|
||
"""
|
||
12-incremental-orchestrator.py — Wait-and-process loop
|
||
|
||
Polls for newly-completed Gemini video analyses, then runs the downstream
|
||
steps (frame extraction + case-images generation) per video. Idempotent by
|
||
construction — each step checks the output and skips if already done.
|
||
|
||
A video is "ready for downstream" when BOTH exist:
|
||
- processing/video-analysis/<id>.json (Gemini analysis output)
|
||
- wiki/videos/<id>.md (markdown rendered by script 08)
|
||
|
||
For each ready video, this loop will:
|
||
1. If no `processing/uap-frames/<id>/*.jpg` exists → run 09-extract-uap-frames.py
|
||
2. If no `processing/case-images/<id>/case-nanobanana.png` AND
|
||
no `processing/case-images/<id>/case-codex.png` → run 11-generate-case-images.py
|
||
(steps 1 and 2 always check their own outputs first; they never re-do work)
|
||
|
||
Termination:
|
||
- Stops when all videos in raw/videos/ are fully downstream-processed
|
||
- Or when --max-iterations is reached
|
||
- Or on SIGINT (Ctrl+C)
|
||
|
||
Usage:
|
||
./12-incremental-orchestrator.py # poll every 90s
|
||
./12-incremental-orchestrator.py --interval 60 # custom poll interval
|
||
./12-incremental-orchestrator.py --max-iterations 50
|
||
./12-incremental-orchestrator.py --skip-codex # only Nano Banana case images
|
||
./12-incremental-orchestrator.py --once # single pass, no loop
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import re
|
||
import subprocess
|
||
import sys
|
||
import time
|
||
import unicodedata
|
||
from datetime import datetime, timezone
|
||
from pathlib import Path
|
||
|
||
|
||
UFO_ROOT = Path("/Users/guto/ufo")
|
||
VIDEOS_DIR = UFO_ROOT / "raw" / "videos"
|
||
ANALYSIS_DIR = UFO_ROOT / "processing" / "video-analysis"
|
||
WIKI_VIDEOS_DIR = UFO_ROOT / "wiki" / "videos"
|
||
FRAMES_DIR = UFO_ROOT / "processing" / "uap-frames"
|
||
CASE_IMAGES_DIR = UFO_ROOT / "processing" / "case-images"
|
||
SCRIPTS = UFO_ROOT / "scripts"
|
||
LOG_PATH = UFO_ROOT / "wiki" / "log.md"
|
||
|
||
|
||
def now() -> str:
|
||
return datetime.now(timezone.utc).strftime("%H:%M:%S")
|
||
|
||
|
||
def filename_to_video_id(name: str) -> str:
|
||
base = name.rsplit(".", 1)[0]
|
||
nfkd = unicodedata.normalize("NFKD", base)
|
||
ascii_s = "".join(c for c in nfkd if not unicodedata.combining(c))
|
||
lower = ascii_s.lower()
|
||
replaced = re.sub(r"[^a-z0-9-]", "-", lower)
|
||
collapsed = re.sub(r"-+", "-", replaced).strip("-")
|
||
if collapsed and collapsed[0].isdigit():
|
||
collapsed = "vid-" + collapsed
|
||
return collapsed
|
||
|
||
|
||
def list_all_video_ids() -> list[str]:
|
||
"""All videos under raw/videos/ mapped to their canonical ids."""
|
||
return sorted(filename_to_video_id(p.name) for p in VIDEOS_DIR.glob("*.mp4"))
|
||
|
||
|
||
def is_analyzed(video_id: str) -> bool:
|
||
"""Ready for downstream: has both JSON and MD from script 08."""
|
||
return (ANALYSIS_DIR / f"{video_id}.json").exists() and (WIKI_VIDEOS_DIR / f"{video_id}.md").exists()
|
||
|
||
|
||
def has_frames(video_id: str) -> bool:
|
||
d = FRAMES_DIR / video_id
|
||
return d.exists() and any(d.glob("*.jpg"))
|
||
|
||
|
||
def has_case_images(video_id: str, want_codex: bool) -> bool:
|
||
d = CASE_IMAGES_DIR / video_id
|
||
nano = d / "case-nanobanana.png"
|
||
codex = d / "case-codex.png"
|
||
if not nano.exists():
|
||
return False
|
||
if want_codex and not codex.exists():
|
||
return False
|
||
return True
|
||
|
||
|
||
def run(cmd: list[str], description: str) -> bool:
|
||
"""Run a subprocess, streaming through. Returns success."""
|
||
print(f" [{now()}] → {description}", flush=True)
|
||
try:
|
||
res = subprocess.run(cmd, timeout=900, check=False)
|
||
return res.returncode == 0
|
||
except subprocess.TimeoutExpired:
|
||
print(f" [{now()}] ✗ timeout on {description}", flush=True)
|
||
return False
|
||
|
||
|
||
def process_one_pass(skip_codex: bool, skip_nano: bool) -> tuple[int, int, int]:
|
||
"""Single pass over all video ids. Returns (newly_processed, total_ready, total_videos)."""
|
||
all_ids = list_all_video_ids()
|
||
ready_ids = [v for v in all_ids if is_analyzed(v)]
|
||
actions_done = 0
|
||
|
||
for vid in ready_ids:
|
||
did_anything = False
|
||
|
||
# Step 1: frames
|
||
if not has_frames(vid):
|
||
cmd = ["python3", str(SCRIPTS / "09-extract-uap-frames.py"), "--video-id", vid]
|
||
if run(cmd, f"frames for {vid}"):
|
||
did_anything = True
|
||
else:
|
||
continue # don't proceed to case images if frames failed
|
||
|
||
# Step 2: case images
|
||
if not has_case_images(vid, want_codex=not skip_codex):
|
||
cmd = ["python3", str(SCRIPTS / "11-generate-case-images.py"),
|
||
"--kind", "videos",
|
||
"--entity-id", vid]
|
||
if skip_codex:
|
||
cmd.append("--skip-codex")
|
||
if skip_nano:
|
||
cmd.append("--skip-nano")
|
||
if run(cmd, f"case images for {vid}"):
|
||
did_anything = True
|
||
|
||
if did_anything:
|
||
actions_done += 1
|
||
|
||
return actions_done, len(ready_ids), len(all_ids)
|
||
|
||
|
||
def all_fully_processed(skip_codex: bool) -> bool:
|
||
"""True when every video has been Gemini-analyzed AND has frames + case images."""
|
||
all_ids = list_all_video_ids()
|
||
if not all_ids:
|
||
return False
|
||
for v in all_ids:
|
||
if not is_analyzed(v):
|
||
return False
|
||
if not has_frames(v):
|
||
return False
|
||
if not has_case_images(v, want_codex=not skip_codex):
|
||
return False
|
||
return True
|
||
|
||
|
||
def main():
|
||
ap = argparse.ArgumentParser()
|
||
ap.add_argument("--interval", type=int, default=90, help="poll interval seconds (default 90)")
|
||
ap.add_argument("--max-iterations", type=int, default=120, help="cap (120 × 90s = 3h)")
|
||
ap.add_argument("--skip-codex", action="store_true")
|
||
ap.add_argument("--skip-nano", action="store_true")
|
||
ap.add_argument("--once", action="store_true", help="single pass, no loop")
|
||
args = ap.parse_args()
|
||
|
||
print(f"[{now()}] orchestrator started")
|
||
print(f" interval={args.interval}s max_iterations={args.max_iterations}")
|
||
print(f" skip_codex={args.skip_codex} skip_nano={args.skip_nano}")
|
||
print(f" watching: {len(list_all_video_ids())} videos in raw/videos/")
|
||
|
||
iteration = 0
|
||
total_actions = 0
|
||
try:
|
||
while iteration < args.max_iterations:
|
||
iteration += 1
|
||
actions, ready, total = process_one_pass(args.skip_codex, args.skip_nano)
|
||
total_actions += actions
|
||
print(f"[{now()}] iter {iteration}: ready={ready}/{total}, "
|
||
f"actions_this_pass={actions}, total_actions={total_actions}", flush=True)
|
||
|
||
if args.once:
|
||
break
|
||
|
||
if all_fully_processed(args.skip_codex):
|
||
print(f"[{now()}] ✓ all {total} videos fully processed — exiting", flush=True)
|
||
break
|
||
|
||
time.sleep(args.interval)
|
||
except KeyboardInterrupt:
|
||
print(f"\n[{now()}] interrupted by user")
|
||
|
||
# Log
|
||
with open(LOG_PATH, "a", encoding="utf-8") as fh:
|
||
fh.write(
|
||
f"\n## {datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')} — ORCHESTRATOR\n"
|
||
f"- operator: archivist\n- script: scripts/12-incremental-orchestrator.py\n"
|
||
f"- iterations: {iteration}\n- total_actions: {total_actions}\n"
|
||
)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|