disclosure-bureau/scripts/12-incremental-orchestrator.py

201 lines
7.1 KiB
Python
Executable file
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
12-incremental-orchestrator.py — Wait-and-process loop
Polls for newly-completed Gemini video analyses, then runs the downstream
steps (frame extraction + case-images generation) per video. Idempotent by
construction — each step checks the output and skips if already done.
A video is "ready for downstream" when BOTH exist:
- processing/video-analysis/<id>.json (Gemini analysis output)
- wiki/videos/<id>.md (markdown rendered by script 08)
For each ready video, this loop will:
1. If no `processing/uap-frames/<id>/*.jpg` exists → run 09-extract-uap-frames.py
2. If no `processing/case-images/<id>/case-nanobanana.png` AND
no `processing/case-images/<id>/case-codex.png` → run 11-generate-case-images.py
(steps 1 and 2 always check their own outputs first; they never re-do work)
Termination:
- Stops when all videos in raw/videos/ are fully downstream-processed
- Or when --max-iterations is reached
- Or on SIGINT (Ctrl+C)
Usage:
./12-incremental-orchestrator.py # poll every 90s
./12-incremental-orchestrator.py --interval 60 # custom poll interval
./12-incremental-orchestrator.py --max-iterations 50
./12-incremental-orchestrator.py --skip-codex # only Nano Banana case images
./12-incremental-orchestrator.py --once # single pass, no loop
"""
from __future__ import annotations
import argparse
import re
import subprocess
import sys
import time
import unicodedata
from datetime import datetime, timezone
from pathlib import Path
UFO_ROOT = Path("/Users/guto/ufo")
VIDEOS_DIR = UFO_ROOT / "raw" / "videos"
ANALYSIS_DIR = UFO_ROOT / "processing" / "video-analysis"
WIKI_VIDEOS_DIR = UFO_ROOT / "wiki" / "videos"
FRAMES_DIR = UFO_ROOT / "processing" / "uap-frames"
CASE_IMAGES_DIR = UFO_ROOT / "processing" / "case-images"
SCRIPTS = UFO_ROOT / "scripts"
LOG_PATH = UFO_ROOT / "wiki" / "log.md"
def now() -> str:
return datetime.now(timezone.utc).strftime("%H:%M:%S")
def filename_to_video_id(name: str) -> str:
base = name.rsplit(".", 1)[0]
nfkd = unicodedata.normalize("NFKD", base)
ascii_s = "".join(c for c in nfkd if not unicodedata.combining(c))
lower = ascii_s.lower()
replaced = re.sub(r"[^a-z0-9-]", "-", lower)
collapsed = re.sub(r"-+", "-", replaced).strip("-")
if collapsed and collapsed[0].isdigit():
collapsed = "vid-" + collapsed
return collapsed
def list_all_video_ids() -> list[str]:
"""All videos under raw/videos/ mapped to their canonical ids."""
return sorted(filename_to_video_id(p.name) for p in VIDEOS_DIR.glob("*.mp4"))
def is_analyzed(video_id: str) -> bool:
"""Ready for downstream: has both JSON and MD from script 08."""
return (ANALYSIS_DIR / f"{video_id}.json").exists() and (WIKI_VIDEOS_DIR / f"{video_id}.md").exists()
def has_frames(video_id: str) -> bool:
d = FRAMES_DIR / video_id
return d.exists() and any(d.glob("*.jpg"))
def has_case_images(video_id: str, want_codex: bool) -> bool:
d = CASE_IMAGES_DIR / video_id
nano = d / "case-nanobanana.png"
codex = d / "case-codex.png"
if not nano.exists():
return False
if want_codex and not codex.exists():
return False
return True
def run(cmd: list[str], description: str) -> bool:
"""Run a subprocess, streaming through. Returns success."""
print(f" [{now()}] → {description}", flush=True)
try:
res = subprocess.run(cmd, timeout=900, check=False)
return res.returncode == 0
except subprocess.TimeoutExpired:
print(f" [{now()}] ✗ timeout on {description}", flush=True)
return False
def process_one_pass(skip_codex: bool, skip_nano: bool) -> tuple[int, int, int]:
"""Single pass over all video ids. Returns (newly_processed, total_ready, total_videos)."""
all_ids = list_all_video_ids()
ready_ids = [v for v in all_ids if is_analyzed(v)]
actions_done = 0
for vid in ready_ids:
did_anything = False
# Step 1: frames
if not has_frames(vid):
cmd = ["python3", str(SCRIPTS / "09-extract-uap-frames.py"), "--video-id", vid]
if run(cmd, f"frames for {vid}"):
did_anything = True
else:
continue # don't proceed to case images if frames failed
# Step 2: case images
if not has_case_images(vid, want_codex=not skip_codex):
cmd = ["python3", str(SCRIPTS / "11-generate-case-images.py"),
"--kind", "videos",
"--entity-id", vid]
if skip_codex:
cmd.append("--skip-codex")
if skip_nano:
cmd.append("--skip-nano")
if run(cmd, f"case images for {vid}"):
did_anything = True
if did_anything:
actions_done += 1
return actions_done, len(ready_ids), len(all_ids)
def all_fully_processed(skip_codex: bool) -> bool:
"""True when every video has been Gemini-analyzed AND has frames + case images."""
all_ids = list_all_video_ids()
if not all_ids:
return False
for v in all_ids:
if not is_analyzed(v):
return False
if not has_frames(v):
return False
if not has_case_images(v, want_codex=not skip_codex):
return False
return True
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--interval", type=int, default=90, help="poll interval seconds (default 90)")
ap.add_argument("--max-iterations", type=int, default=120, help="cap (120 × 90s = 3h)")
ap.add_argument("--skip-codex", action="store_true")
ap.add_argument("--skip-nano", action="store_true")
ap.add_argument("--once", action="store_true", help="single pass, no loop")
args = ap.parse_args()
print(f"[{now()}] orchestrator started")
print(f" interval={args.interval}s max_iterations={args.max_iterations}")
print(f" skip_codex={args.skip_codex} skip_nano={args.skip_nano}")
print(f" watching: {len(list_all_video_ids())} videos in raw/videos/")
iteration = 0
total_actions = 0
try:
while iteration < args.max_iterations:
iteration += 1
actions, ready, total = process_one_pass(args.skip_codex, args.skip_nano)
total_actions += actions
print(f"[{now()}] iter {iteration}: ready={ready}/{total}, "
f"actions_this_pass={actions}, total_actions={total_actions}", flush=True)
if args.once:
break
if all_fully_processed(args.skip_codex):
print(f"[{now()}] ✓ all {total} videos fully processed — exiting", flush=True)
break
time.sleep(args.interval)
except KeyboardInterrupt:
print(f"\n[{now()}] interrupted by user")
# Log
with open(LOG_PATH, "a", encoding="utf-8") as fh:
fh.write(
f"\n## {datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')} — ORCHESTRATOR\n"
f"- operator: archivist\n- script: scripts/12-incremental-orchestrator.py\n"
f"- iterations: {iteration}\n- total_actions: {total_actions}\n"
)
if __name__ == "__main__":
main()