278 lines
10 KiB
Python
Executable file
278 lines
10 KiB
Python
Executable file
#!/usr/bin/env python3
|
||
"""
|
||
07-test-agent.py — Minimal chat-agent CLI that validates the schema end-to-end
|
||
|
||
Simulates one chat-bubble round trip:
|
||
1. User asks a free-text query.
|
||
2. Agent walks wiki/ + case/ and collects relevant context markdowns.
|
||
3. Calls Claude Haiku (via claude CLI OAuth — same path as 02-vision-page.py)
|
||
with a system prompt that asks for STRUCTURED output:
|
||
|
||
{
|
||
"answer_en": "...",
|
||
"answer_pt_br": "...",
|
||
"citations": [
|
||
{
|
||
"kind": "page|crop|entity",
|
||
"page_id": "doc-id/pNNN", # for kind=page/crop
|
||
"entity_link": "[[loc/.../...]]", # for kind=entity
|
||
"png_url": "/static/png/doc-id/p-NNN.png",
|
||
"crop_url": "/static/crops/doc-id/CROP-ID.png", # if available
|
||
"bbox": {"x": .., "y": .., "w": .., "h": ..}, # if applicable
|
||
"snippet_en": "...",
|
||
"snippet_pt_br": "..."
|
||
}
|
||
]
|
||
}
|
||
|
||
4. Renders the JSON pretty-printed so the schema-to-UI contract is visible.
|
||
|
||
This is NOT the production agent — it's a smoke test that proves the wiki
|
||
schema carries everything the future chat UI will need (citations at page +
|
||
bbox, bilingual snippets, crop URLs).
|
||
|
||
Usage:
|
||
./07-test-agent.py "What UAP was observed in the Mediterranean?"
|
||
./07-test-agent.py --max-context 20 "How many redacted pages does D54 have?"
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import json
|
||
import re
|
||
import subprocess
|
||
import sys
|
||
from datetime import datetime, timezone
|
||
from pathlib import Path
|
||
|
||
try:
|
||
import yaml
|
||
except ImportError:
|
||
sys.stderr.write("Missing pyyaml. Run: pip3 install pyyaml\n")
|
||
sys.exit(1)
|
||
|
||
|
||
UFO_ROOT = Path("/Users/guto/ufo")
|
||
WIKI_BASE = UFO_ROOT / "wiki"
|
||
CASE_BASE = UFO_ROOT / "case"
|
||
PNG_BASE = UFO_ROOT / "processing" / "png"
|
||
CROPS_BASE = UFO_ROOT / "processing" / "crops"
|
||
|
||
MODEL = "haiku"
|
||
MAX_TURNS = 3
|
||
DEFAULT_MAX_CONTEXT_FILES = 12
|
||
|
||
# Future server prefixes (placeholder; real server resolves these to actual paths)
|
||
PNG_URL_PREFIX = "/static/png"
|
||
CROP_URL_PREFIX = "/static/crops"
|
||
|
||
|
||
def utc_now_iso() -> str:
|
||
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
|
||
|
||
|
||
def read_md(path: Path) -> tuple[dict, str]:
|
||
c = path.read_text(encoding="utf-8")
|
||
if not c.startswith("---"):
|
||
return {}, c
|
||
end = c.find("---", 4)
|
||
if end == -1:
|
||
return {}, c
|
||
try:
|
||
return (yaml.safe_load(c[3:end].strip()) or {}), c[end + 3 :]
|
||
except yaml.YAMLError:
|
||
return {}, c[end + 3 :]
|
||
|
||
|
||
def tokenize(text: str) -> set[str]:
|
||
return {t.lower() for t in re.findall(r"[a-zA-Z0-9À-ſ]{3,}", text or "")}
|
||
|
||
|
||
def score_file(query_tokens: set[str], file_text: str, file_fm: dict) -> float:
|
||
"""Trivial keyword-overlap score; good enough for smoke test."""
|
||
file_tokens = tokenize(file_text)
|
||
# Boost: include canonical_name and aliases in search tokens
|
||
if file_fm:
|
||
for k in ("canonical_name", "canonical_title", "aliases"):
|
||
v = file_fm.get(k)
|
||
if isinstance(v, str):
|
||
file_tokens |= tokenize(v)
|
||
elif isinstance(v, list):
|
||
for it in v:
|
||
if isinstance(it, str):
|
||
file_tokens |= tokenize(it)
|
||
if not file_tokens:
|
||
return 0.0
|
||
overlap = len(query_tokens & file_tokens)
|
||
return overlap / max(1, len(query_tokens))
|
||
|
||
|
||
def gather_context(query: str, max_files: int) -> list[Path]:
|
||
"""Return list of markdown paths most relevant to the query, by keyword overlap."""
|
||
q_tokens = tokenize(query)
|
||
scored: list[tuple[float, Path]] = []
|
||
for base in (WIKI_BASE, CASE_BASE):
|
||
for p in base.rglob("*.md"):
|
||
if p.name == "graph.json":
|
||
continue
|
||
try:
|
||
fm, body = read_md(p)
|
||
except Exception:
|
||
continue
|
||
score = score_file(q_tokens, body, fm)
|
||
if score > 0:
|
||
scored.append((score, p))
|
||
scored.sort(key=lambda x: -x[0])
|
||
return [p for _, p in scored[:max_files]]
|
||
|
||
|
||
def crop_url_for(image_id: str) -> str:
|
||
"""Return URL for a crop image."""
|
||
# image_id format: IMG-DOCSHORT-pNNN-NN, TBL-..., SIG-...
|
||
# Convert to file path: processing/crops/<doc-id>/<image_id>.png
|
||
# but doc-id is encoded compactly in the crop_id. We need to scan instead.
|
||
matches = list(CROPS_BASE.rglob(f"{image_id}.png"))
|
||
if matches:
|
||
rel = matches[0].relative_to(UFO_ROOT / "processing" / "crops")
|
||
return f"{CROP_URL_PREFIX}/{rel}"
|
||
return ""
|
||
|
||
|
||
def page_url_for(page_id: str) -> str:
|
||
"""page_id format: <doc-id>/pNNN. PNG file: processing/png/<doc-id>/p-NNN.png"""
|
||
m = re.match(r"^(.+)/p(\d{3})$", page_id)
|
||
if not m:
|
||
return ""
|
||
doc_id, num = m.group(1), m.group(2)
|
||
return f"{PNG_URL_PREFIX}/{doc_id}/p-{num}.png"
|
||
|
||
|
||
def build_system_prompt() -> str:
|
||
return """You are a research assistant for the war.gov/ufo UAP/UFO document corpus.
|
||
|
||
The user asks a question. You receive a set of markdown files from a curated wiki (Karpathy-style LLM wiki) plus case-investigation artifacts. Each file's frontmatter carries strict provenance: doc_id, page_id, bbox coordinates, classifications, etc. Body text is bilingual (EN + PT-BR).
|
||
|
||
Your output MUST be a single JSON object with this exact shape (no markdown fence, no commentary, no preamble):
|
||
|
||
{
|
||
"answer_en": "2-5 sentence English answer grounded in the provided files. Every factual claim must be traceable to a citation below.",
|
||
"answer_pt_br": "Same answer translated to Brazilian Portuguese (pt-br). Use Brazilian vocabulary and spelling.",
|
||
"citations": [
|
||
{
|
||
"kind": "page",
|
||
"page_id": "doc-id/pNNN",
|
||
"snippet_en": "short verbatim or near-verbatim excerpt supporting the claim (English)",
|
||
"snippet_pt_br": "same in Brazilian Portuguese",
|
||
"bbox": null
|
||
},
|
||
{
|
||
"kind": "entity",
|
||
"entity_link": "[[loc/aegean-sea-off-santorini-greece]] or similar wiki-link",
|
||
"snippet_en": "...",
|
||
"snippet_pt_br": "..."
|
||
}
|
||
]
|
||
}
|
||
|
||
Rules:
|
||
- ONLY cite files that you were given. Do not invent page_ids or entity links.
|
||
- snippet_en and snippet_pt_br must be SHORT (1-2 sentences each).
|
||
- Brazilian Portuguese only for *_pt_br fields. Preserve UTF-8 accents.
|
||
- Verbatim quotes FROM the source documents stay in their original language (English) inside snippets — only the surrounding narrative is translated.
|
||
- If no file supports an answer, return: {"answer_en":"Insufficient evidence in corpus.","answer_pt_br":"Evidências insuficientes no corpus.","citations":[]}
|
||
- Output ONLY the JSON. No fence."""
|
||
|
||
|
||
def call_claude(user_prompt: str, system_prompt: str) -> dict:
|
||
cmd = [
|
||
"claude",
|
||
"-p",
|
||
"--model", MODEL,
|
||
"--output-format", "json",
|
||
"--max-turns", str(MAX_TURNS),
|
||
"--allowedTools", "Read",
|
||
"--add-dir", str(UFO_ROOT),
|
||
"--append-system-prompt", system_prompt,
|
||
"--",
|
||
user_prompt,
|
||
]
|
||
res = subprocess.run(cmd, capture_output=True, text=True, timeout=600, check=False)
|
||
if res.returncode != 0:
|
||
raise RuntimeError(f"claude CLI failed (rc={res.returncode}): {res.stderr[-1000:]}")
|
||
if not res.stdout.strip():
|
||
raise RuntimeError(f"claude CLI returned empty stdout. stderr: {res.stderr[-1000:]}")
|
||
try:
|
||
cli = json.loads(res.stdout)
|
||
except json.JSONDecodeError as e:
|
||
raise RuntimeError(f"claude CLI returned invalid JSON: {e}. stdout[:500]={res.stdout[:500]!r}")
|
||
if cli.get("is_error"):
|
||
raise RuntimeError(f"claude reported error: {cli.get('result','')[:500]}")
|
||
result = cli.get("result", "").strip()
|
||
if result.startswith("```"):
|
||
result = re.sub(r"^```(?:json)?\s*", "", result)
|
||
result = re.sub(r"\s*```$", "", result)
|
||
return {
|
||
"parsed": json.loads(result),
|
||
"meta": {
|
||
"duration_ms": cli.get("duration_ms"),
|
||
"total_cost_usd": cli.get("total_cost_usd"),
|
||
"session_id": cli.get("session_id"),
|
||
},
|
||
}
|
||
|
||
|
||
def enrich_citations(parsed: dict) -> dict:
|
||
"""Add png_url and crop_url to each page citation when possible."""
|
||
for cit in parsed.get("citations", []):
|
||
if cit.get("kind") == "page":
|
||
pid = cit.get("page_id", "")
|
||
cit["png_url"] = page_url_for(pid)
|
||
elif cit.get("kind") == "crop":
|
||
crop_id = cit.get("crop_id", "")
|
||
if crop_id:
|
||
cit["crop_url"] = crop_url_for(crop_id)
|
||
return parsed
|
||
|
||
|
||
def main():
|
||
ap = argparse.ArgumentParser(description="Minimal chat-agent smoke test for the UFO wiki.")
|
||
ap.add_argument("query", help="user question (in English or PT-BR)")
|
||
ap.add_argument("--max-context", type=int, default=DEFAULT_MAX_CONTEXT_FILES,
|
||
help=f"max number of markdown files to surface as context (default {DEFAULT_MAX_CONTEXT_FILES})")
|
||
args = ap.parse_args()
|
||
|
||
print(f"Query: {args.query}\n", flush=True)
|
||
print(f"Gathering context (max {args.max_context} files)...", flush=True)
|
||
context_files = gather_context(args.query, args.max_context)
|
||
for f in context_files:
|
||
print(f" - {f.relative_to(UFO_ROOT)}", flush=True)
|
||
if not context_files:
|
||
print(" (no relevant files found)", flush=True)
|
||
result = {"answer_en": "No relevant files found in the wiki.", "answer_pt_br": "Nenhum arquivo relevante encontrado.", "citations": []}
|
||
print("\n" + json.dumps(result, indent=2, ensure_ascii=False))
|
||
return
|
||
|
||
# Build user prompt: list of file paths for the agent to Read
|
||
file_list = "\n".join(f"- {p.relative_to(UFO_ROOT)}" for p in context_files)
|
||
user_prompt = (
|
||
f"User question:\n{args.query}\n\n"
|
||
f"Read the following files from /Users/guto/ufo/ "
|
||
f"(use the Read tool on each one as needed):\n{file_list}\n\n"
|
||
f"Then output the structured JSON answer per the system prompt."
|
||
)
|
||
|
||
print("\nCalling Haiku...", flush=True)
|
||
try:
|
||
out = call_claude(user_prompt, build_system_prompt())
|
||
except Exception as e:
|
||
sys.stderr.write(f"FATAL: {e}\n")
|
||
sys.exit(1)
|
||
|
||
parsed = enrich_citations(out["parsed"])
|
||
print(f"\n=== Agent reply (cost ${out['meta'].get('total_cost_usd', 0):.4f}, "
|
||
f"latency {out['meta'].get('duration_ms', 0)/1000:.1f}s) ===\n", flush=True)
|
||
print(json.dumps(parsed, indent=2, ensure_ascii=False))
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|