disclosure-bureau/scripts/12-incremental-orchestrator.py

202 lines
7.1 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""
12-incremental-orchestrator.py Wait-and-process loop
Polls for newly-completed Gemini video analyses, then runs the downstream
steps (frame extraction + case-images generation) per video. Idempotent by
construction each step checks the output and skips if already done.
A video is "ready for downstream" when BOTH exist:
- processing/video-analysis/<id>.json (Gemini analysis output)
- wiki/videos/<id>.md (markdown rendered by script 08)
For each ready video, this loop will:
1. If no `processing/uap-frames/<id>/*.jpg` exists run 09-extract-uap-frames.py
2. If no `processing/case-images/<id>/case-nanobanana.png` AND
no `processing/case-images/<id>/case-codex.png` run 11-generate-case-images.py
(steps 1 and 2 always check their own outputs first; they never re-do work)
Termination:
- Stops when all videos in raw/videos/ are fully downstream-processed
- Or when --max-iterations is reached
- Or on SIGINT (Ctrl+C)
Usage:
./12-incremental-orchestrator.py # poll every 90s
./12-incremental-orchestrator.py --interval 60 # custom poll interval
./12-incremental-orchestrator.py --max-iterations 50
./12-incremental-orchestrator.py --skip-codex # only Nano Banana case images
./12-incremental-orchestrator.py --once # single pass, no loop
"""
from __future__ import annotations
import argparse
import re
import subprocess
import sys
import time
import unicodedata
from datetime import datetime, timezone
from pathlib import Path
UFO_ROOT = Path("/Users/guto/ufo")
VIDEOS_DIR = UFO_ROOT / "raw" / "videos"
ANALYSIS_DIR = UFO_ROOT / "processing" / "video-analysis"
WIKI_VIDEOS_DIR = UFO_ROOT / "wiki" / "videos"
FRAMES_DIR = UFO_ROOT / "processing" / "uap-frames"
CASE_IMAGES_DIR = UFO_ROOT / "processing" / "case-images"
SCRIPTS = UFO_ROOT / "scripts"
LOG_PATH = UFO_ROOT / "wiki" / "log.md"
def now() -> str:
return datetime.now(timezone.utc).strftime("%H:%M:%S")
def filename_to_video_id(name: str) -> str:
base = name.rsplit(".", 1)[0]
nfkd = unicodedata.normalize("NFKD", base)
ascii_s = "".join(c for c in nfkd if not unicodedata.combining(c))
lower = ascii_s.lower()
replaced = re.sub(r"[^a-z0-9-]", "-", lower)
collapsed = re.sub(r"-+", "-", replaced).strip("-")
if collapsed and collapsed[0].isdigit():
collapsed = "vid-" + collapsed
return collapsed
def list_all_video_ids() -> list[str]:
"""All videos under raw/videos/ mapped to their canonical ids."""
return sorted(filename_to_video_id(p.name) for p in VIDEOS_DIR.glob("*.mp4"))
def is_analyzed(video_id: str) -> bool:
"""Ready for downstream: has both JSON and MD from script 08."""
return (ANALYSIS_DIR / f"{video_id}.json").exists() and (WIKI_VIDEOS_DIR / f"{video_id}.md").exists()
def has_frames(video_id: str) -> bool:
d = FRAMES_DIR / video_id
return d.exists() and any(d.glob("*.jpg"))
def has_case_images(video_id: str, want_codex: bool) -> bool:
d = CASE_IMAGES_DIR / video_id
nano = d / "case-nanobanana.png"
codex = d / "case-codex.png"
if not nano.exists():
return False
if want_codex and not codex.exists():
return False
return True
def run(cmd: list[str], description: str) -> bool:
"""Run a subprocess, streaming through. Returns success."""
print(f" [{now()}] → {description}", flush=True)
try:
res = subprocess.run(cmd, timeout=900, check=False)
return res.returncode == 0
except subprocess.TimeoutExpired:
print(f" [{now()}] ✗ timeout on {description}", flush=True)
return False
def process_one_pass(skip_codex: bool, skip_nano: bool) -> tuple[int, int, int]:
"""Single pass over all video ids. Returns (newly_processed, total_ready, total_videos)."""
all_ids = list_all_video_ids()
ready_ids = [v for v in all_ids if is_analyzed(v)]
actions_done = 0
for vid in ready_ids:
did_anything = False
# Step 1: frames
if not has_frames(vid):
cmd = ["python3", str(SCRIPTS / "09-extract-uap-frames.py"), "--video-id", vid]
if run(cmd, f"frames for {vid}"):
did_anything = True
else:
continue # don't proceed to case images if frames failed
# Step 2: case images
if not has_case_images(vid, want_codex=not skip_codex):
cmd = ["python3", str(SCRIPTS / "11-generate-case-images.py"),
"--kind", "videos",
"--entity-id", vid]
if skip_codex:
cmd.append("--skip-codex")
if skip_nano:
cmd.append("--skip-nano")
if run(cmd, f"case images for {vid}"):
did_anything = True
if did_anything:
actions_done += 1
return actions_done, len(ready_ids), len(all_ids)
def all_fully_processed(skip_codex: bool) -> bool:
"""True when every video has been Gemini-analyzed AND has frames + case images."""
all_ids = list_all_video_ids()
if not all_ids:
return False
for v in all_ids:
if not is_analyzed(v):
return False
if not has_frames(v):
return False
if not has_case_images(v, want_codex=not skip_codex):
return False
return True
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--interval", type=int, default=90, help="poll interval seconds (default 90)")
ap.add_argument("--max-iterations", type=int, default=120, help="cap (120 × 90s = 3h)")
ap.add_argument("--skip-codex", action="store_true")
ap.add_argument("--skip-nano", action="store_true")
ap.add_argument("--once", action="store_true", help="single pass, no loop")
args = ap.parse_args()
print(f"[{now()}] orchestrator started")
print(f" interval={args.interval}s max_iterations={args.max_iterations}")
print(f" skip_codex={args.skip_codex} skip_nano={args.skip_nano}")
print(f" watching: {len(list_all_video_ids())} videos in raw/videos/")
iteration = 0
total_actions = 0
try:
while iteration < args.max_iterations:
iteration += 1
actions, ready, total = process_one_pass(args.skip_codex, args.skip_nano)
total_actions += actions
print(f"[{now()}] iter {iteration}: ready={ready}/{total}, "
f"actions_this_pass={actions}, total_actions={total_actions}", flush=True)
if args.once:
break
if all_fully_processed(args.skip_codex):
print(f"[{now()}] ✓ all {total} videos fully processed — exiting", flush=True)
break
time.sleep(args.interval)
except KeyboardInterrupt:
print(f"\n[{now()}] interrupted by user")
# Log
with open(LOG_PATH, "a", encoding="utf-8") as fh:
fh.write(
f"\n## {datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')} — ORCHESTRATOR\n"
f"- operator: archivist\n- script: scripts/12-incremental-orchestrator.py\n"
f"- iterations: {iteration}\n- total_actions: {total_actions}\n"
)
if __name__ == "__main__":
main()