disclosure-bureau/scripts/reextract/run_parallel.sh

98 lines
2.5 KiB
Bash
Raw Normal View History

#!/usr/bin/env bash
# Parallel re-extraction orchestrator.
#
# - Lists every doc that has raw/<doc>--subagent/_index.json
# - Skips docs that already have _reextract.json (idempotent)
# - Uses an mkdir-based per-doc lock to prevent two workers from racing
# - Runs N workers in parallel (default 8, override via WORKERS=N)
# - Logs each doc to raw/<doc>--subagent/_reextract.log
#
# Run:
# ./run_parallel.sh # all docs, 8 workers
# WORKERS=4 ./run_parallel.sh # 4 workers
# ./run_parallel.sh DOC1 DOC2 # specific docs only
set -uo pipefail
UFO="/Users/guto/ufo"
RAW="$UFO/raw"
RUN="$UFO/scripts/reextract/run.py"
WORKERS="${WORKERS:-4}"
# Build list of doc IDs
if [ "$#" -gt 0 ]; then
DOCS=("$@")
else
DOCS=()
for d in "$RAW"/*--subagent; do
[ -f "$d/_index.json" ] || continue
doc_id=$(basename "$d" | sed 's/--subagent$//')
DOCS+=("$doc_id")
done
fi
echo "=== Re-extract orchestrator ==="
echo " docs queued: ${#DOCS[@]}"
echo " workers: $WORKERS"
echo ""
process_one() {
local doc_id="$1"
local sub="$RAW/$doc_id--subagent"
local out="$sub/_reextract.json"
local log="$sub/_reextract.log"
local lock="$sub/.reextract.lock"
# Skip if already extracted
if [ -f "$out" ]; then
# Quick sanity: must parse as JSON
if python3 -c "import json,sys; json.load(open('$out'))" 2>/dev/null; then
echo "[SKIP] $doc_id (already extracted)"
return 0
fi
fi
# Acquire lock via mkdir (atomic)
if ! mkdir "$lock" 2>/dev/null; then
echo "[LOCK] $doc_id (another worker has it)"
return 0
fi
trap "rmdir '$lock' 2>/dev/null || true" EXIT
local started=$(date +%s)
echo "[BEGIN] $doc_id"
if python3 "$RUN" "$doc_id" > "$log" 2>&1; then
local elapsed=$(($(date +%s) - started))
echo "[OK] $doc_id (${elapsed}s)"
else
local elapsed=$(($(date +%s) - started))
echo "[FAIL] $doc_id (${elapsed}s) — see $log"
fi
rmdir "$lock" 2>/dev/null || true
trap - EXIT
}
export -f process_one
export RAW RUN
# Run in parallel via xargs
printf '%s\n' "${DOCS[@]}" | xargs -n 1 -P "$WORKERS" -I {} bash -c 'process_one "$@"' _ {}
echo ""
echo "=== Done. Summary: ==="
ok=0; skip=0; fail=0
for d in "${DOCS[@]}"; do
out="$RAW/$d--subagent/_reextract.json"
if [ -f "$out" ]; then
if python3 -c "import json,sys; json.load(open('$out'))" 2>/dev/null; then
ok=$((ok + 1))
else
fail=$((fail + 1))
fi
else
fail=$((fail + 1))
fi
done
echo " OK: $ok"
echo " FAIL: $fail"