disclosure-bureau/scripts/16-extract-table-csv.py

391 lines
15 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""
16-extract-table-csv.py Row-by-row extraction of multi-page tables CSV
For each `wiki/tables/<TBL-id>.md`:
1. Resolve each span's PNG path (processing/png/<doc-id>/p-NNN.png)
2. Crop the table region using bbox (Pillow)
3. Send all crops in order to Haiku with a prompt to extract the full table
preserving multi-page row continuity
4. Receive JSON: { headers: [...], rows: [[...], ...] }
5. Save:
- processing/tables/<TBL-id>.csv (extracted CSV)
- processing/tables/<TBL-id>.json (raw extraction + metadata)
- processing/table-crops/<TBL-id>/ (the crop JPGs for inspection)
- Update wiki/tables/<TBL-id>.md frontmatter:
csv_path, extraction_quality, headers, row_count_extracted,
extracted_at, extraction_model
Idempotent: skip if CSV exists and not --force.
Usage:
./16-extract-table-csv.py # all multi-page tables
./16-extract-table-csv.py --table-id <id> # single
./16-extract-table-csv.py --force # re-extract
./16-extract-table-csv.py --model haiku # default; or sonnet
"""
from __future__ import annotations
import argparse
import csv
import json
import re
import subprocess
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
try:
import yaml
except ImportError:
sys.stderr.write("Missing pyyaml. pip3 install pyyaml\n")
sys.exit(1)
try:
from PIL import Image
except ImportError:
sys.stderr.write("Missing pillow. pip3 install pillow\n")
sys.exit(1)
UFO_ROOT = Path("/Users/guto/ufo")
TABLES_BASE = UFO_ROOT / "wiki" / "tables"
PNG_BASE = UFO_ROOT / "processing" / "png"
CSV_BASE = UFO_ROOT / "processing" / "tables"
CROPS_BASE = UFO_ROOT / "processing" / "table-crops"
LOG_PATH = UFO_ROOT / "wiki" / "log.md"
DEFAULT_MODEL = "haiku"
MAX_TURNS = 4
DEFAULT_TIMEOUT = 240
EXTRACT_PROMPT = """You are extracting a multi-page table from a US Department of War declassified UAP document.
You will see {n_crops} image crops in order. They represent ONE logical table split across {n_pages} consecutive pages. The first crop is the start, the last is the end, and any middle ones continue the rows.
STEPS:
1. Use the Read tool on EACH of these crop image paths, IN ORDER:
{crop_list}
2. Identify the column headers (typically only on the first page; subsequent pages may repeat headers skip those repeats).
3. Concatenate all rows from all pages into a single ordered list. A row that visually appears to span a page break (e.g. a cell continues onto the next page) should be merged into ONE row when possible.
4. Output ONE JSON object (no fence, no preamble) with this exact schema:
{{
"headers": ["col1", "col2", ...],
"rows": [
["row1_col1_value", "row1_col2_value", ...],
["row2_col1_value", "row2_col2_value", ...]
],
"row_count": <int total rows extracted, excluding header repeats>,
"column_count": <int number of columns>,
"headers_repeat_on_each_page": true|false,
"merged_cross_page_rows": <int how many rows you merged across page breaks>,
"extraction_quality": <float 0..1 your confidence the extraction is complete and accurate>,
"notes": "Any caveats: illegible cells, redactions inside cells, merged headers, ambiguous values, etc. Use 'REDACTED' for cell values that are blacked out, and '???' for illegible content."
}}
RULES:
- Preserve ORIGINAL LANGUAGE of all cell text. Do NOT translate.
- For redacted cells: "REDACTED" or "REDACTED (1.4(a))" if the code is visible.
- For illegible cells: "???".
- For empty cells: empty string "".
- If a cell contains a list (multiple values), preserve as comma-separated.
- Numbers stay as strings (preserve formatting like "24,989" or "1319Z").
- Headers should be short, snake_case-friendly (e.g. "incident_date", "shape", "altitude_ft").
- Output ONLY the JSON. No fence, no commentary."""
def utc_now_iso() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def read_md(path: Path) -> tuple[dict, str]:
c = path.read_text(encoding="utf-8")
if not c.startswith("---"):
return {}, c
end = c.find("---", 4)
if end == -1:
return {}, c
try:
return (yaml.safe_load(c[3:end].strip()) or {}), c[end + 3 :].lstrip("\n")
except yaml.YAMLError:
return {}, c[end + 3 :].lstrip("\n")
def write_md(path: Path, fm: dict, body: str) -> bool:
yaml_str = yaml.dump(fm, allow_unicode=True, sort_keys=False, default_flow_style=False)
new = f"---\n{yaml_str}---\n\n{body}" if not body.startswith("\n") else f"---\n{yaml_str}---\n{body}"
if path.exists() and path.read_text(encoding="utf-8") == new:
return False
path.write_text(new, encoding="utf-8")
return True
def resolve_page_png(page_link: str) -> Path | None:
"""[[doc-id/p059]] → /Users/guto/ufo/processing/png/doc-id/p-059.png"""
m = re.match(r"\[\[([a-z0-9-]+)/p(\d+)\]\]", page_link)
if not m:
return None
doc_id = m.group(1)
page_num = int(m.group(2))
png = PNG_BASE / doc_id / f"p-{page_num:03d}.png"
return png if png.exists() else None
def crop_table_region(png_path: Path, bbox: dict, out_path: Path, padding: float = 0.005) -> bool:
"""Crop bbox from page PNG (Pillow) and save as JPEG."""
try:
with Image.open(png_path) as im:
W, H = im.size
x = max(0.0, float(bbox.get("x", 0)) - padding)
y = max(0.0, float(bbox.get("y", 0)) - padding)
w = min(1.0 - x, float(bbox.get("w", 0)) + 2 * padding)
h = min(1.0 - y, float(bbox.get("h", 0)) + 2 * padding)
if w <= 0 or h <= 0:
return False
px = int(round(x * W))
py = int(round(y * H))
pw = max(1, int(round(w * W)))
ph = max(1, int(round(h * H)))
crop = im.crop((px, py, px + pw, py + ph))
out_path.parent.mkdir(parents=True, exist_ok=True)
if crop.mode != "RGB":
crop = crop.convert("RGB")
crop.save(out_path, "JPEG", quality=92)
return True
except Exception as e:
sys.stderr.write(f" ✗ crop failed: {e}\n")
return False
def call_haiku_extract(crops: list[Path], n_pages: int) -> tuple[dict | None, str]:
"""Call Haiku via claude CLI with the crops and structured-output prompt."""
crop_list = "\n".join(f" {i+1}. {str(p)}" for i, p in enumerate(crops))
prompt = EXTRACT_PROMPT.format(n_crops=len(crops), n_pages=n_pages, crop_list=crop_list)
cmd = [
"claude", "-p",
"--model", DEFAULT_MODEL,
"--output-format", "json",
"--max-turns", str(MAX_TURNS),
"--allowedTools", "Read",
"--add-dir", str(crops[0].parent),
"--",
prompt,
]
try:
res = subprocess.run(cmd, capture_output=True, text=True, timeout=DEFAULT_TIMEOUT, check=False)
except subprocess.TimeoutExpired:
return None, "timeout"
if res.returncode != 0:
return None, f"rc={res.returncode}: {res.stderr[-300:]}"
try:
cli = json.loads(res.stdout)
except json.JSONDecodeError:
return None, "cli-stdout-not-json"
if cli.get("is_error"):
return None, "is_error"
text = (cli.get("result") or "").strip()
parsed, err = robust_json_parse(text)
if parsed is not None:
return parsed, ""
return None, f"result-not-json: {err}"
def robust_json_parse(text: str) -> tuple[dict | None, str]:
"""Parse JSON tolerant of fences, trailing commentary, unbalanced edges.
Strategy:
1. Strip ``` fences.
2. Try direct json.loads.
3. Find first balanced { ... } block and parse it.
4. As a last resort: rewrite typical Haiku gotchas (smart quotes, trailing
comma before }, unescaped newlines inside strings).
"""
t = text.strip()
t = re.sub(r"^```(?:json)?\s*", "", t)
t = re.sub(r"\s*```$", "", t)
try:
return json.loads(t), ""
except json.JSONDecodeError as e:
first_err = str(e)
# Find balanced { ... }
start = t.find("{")
if start >= 0:
depth = 0
for i in range(start, len(t)):
if t[i] == "{":
depth += 1
elif t[i] == "}":
depth -= 1
if depth == 0:
cand = t[start:i + 1]
try:
return json.loads(cand), ""
except json.JSONDecodeError:
break
# Final pass: remove trailing commas before } or ]
cleaned = re.sub(r",\s*([}\]])", r"\1", t)
try:
return json.loads(cleaned), ""
except json.JSONDecodeError:
return None, first_err
def save_csv(out_csv: Path, headers: list[str], rows: list[list]) -> None:
out_csv.parent.mkdir(parents=True, exist_ok=True)
with out_csv.open("w", newline="", encoding="utf-8") as fh:
w = csv.writer(fh)
w.writerow(headers)
for row in rows:
# Normalize row length to header length
padded = list(row) + [""] * (len(headers) - len(row))
w.writerow(padded[: len(headers)])
def render_table_md_body(table_id: str, fm: dict, parsed: dict | None) -> str:
spans = fm.get("spans_pages") or []
body = f"# {fm.get('canonical_title', table_id)}\n\n"
body += f"> Multi-page table spanning {len(spans)} pages of {fm.get('source_doc','')}\n\n"
body += "## Pages\n\n"
for sp in spans:
body += f"- {sp.get('role','?')}: {sp.get('page','')} · bbox {sp.get('bbox')}\n"
body += "\n"
if parsed:
headers = parsed.get("headers") or []
rows = parsed.get("rows") or []
body += f"## Extracted data ({parsed.get('row_count', len(rows))} rows × {len(headers)} cols)\n\n"
body += f"_Extraction quality: `{parsed.get('extraction_quality')}` · "
body += f"merged cross-page rows: {parsed.get('merged_cross_page_rows', 0)} · "
body += f"CSV: `{fm.get('csv_path')}`_\n\n"
if parsed.get("notes"):
body += f"> **Notes from extraction:** {parsed['notes']}\n\n"
if headers and rows:
body += "| " + " | ".join(headers) + " |\n"
body += "|" + "|".join(["---"] * len(headers)) + "|\n"
for row in rows[:50]:
cells = [str(c).replace("|", "\\|").replace("\n", " ") for c in row]
# pad
cells = cells + [""] * (len(headers) - len(cells))
body += "| " + " | ".join(cells[: len(headers)]) + " |\n"
if len(rows) > 50:
body += f"\n_(showing first 50 of {len(rows)} rows — full CSV in `{fm.get('csv_path')}`)_\n"
else:
body += "## Extracted data\n\n_Extraction not yet run or failed. Run `scripts/16-extract-table-csv.py`._\n"
body += "\n## Notes\n\nPer-page table snippets live in each page.md's `tables_detected[]`. Full row-by-row data is in the CSV at `csv_path`.\n"
return body
def process_table(md_path: Path, force: bool) -> bool:
fm, _ = read_md(md_path)
if fm.get("type") != "table":
return False
if not fm.get("multi_page"):
return False # single-page tables stay inline
table_id = fm.get("table_id") or md_path.stem
csv_path = CSV_BASE / f"{table_id}.csv"
json_path = CSV_BASE / f"{table_id}.json"
crops_dir = CROPS_BASE / table_id
if csv_path.exists() and json_path.exists() and not force:
return False
spans = fm.get("spans_pages") or []
if len(spans) < 2:
return False
print(f"\n=== {table_id}{len(spans)} pages ===", flush=True)
crops: list[Path] = []
for i, sp in enumerate(spans):
page_link = sp.get("page", "")
bbox = sp.get("bbox") or {}
png = resolve_page_png(page_link)
if not png:
sys.stderr.write(f" ✗ no PNG for {page_link}\n")
return False
crop_out = crops_dir / f"span-{i+1:02d}.jpg"
if not crop_out.exists() or force:
if not crop_table_region(png, bbox, crop_out):
return False
crops.append(crop_out)
print(f" ✓ crop {i+1}: {crop_out.name}", flush=True)
t0 = time.time()
parsed = None
err = ""
for attempt in range(1, 4):
print(f" → calling Haiku (attempt {attempt}/3) to extract CSV from {len(crops)} crops…", flush=True)
parsed, err = call_haiku_extract(crops, n_pages=len(spans))
if parsed:
break
print(f" · attempt {attempt} failed: {err[:120]}", flush=True)
time.sleep(4 * attempt)
elapsed = time.time() - t0
if not parsed:
print(f" ✗ extraction failed after 3 attempts ({elapsed:.1f}s): {err}", flush=True)
return False
headers = parsed.get("headers") or []
rows = parsed.get("rows") or []
if not headers or not rows:
print(f" ⚠ extraction returned empty headers/rows", flush=True)
return False
save_csv(csv_path, headers, rows)
json_path.write_text(json.dumps(parsed, indent=2, ensure_ascii=False), encoding="utf-8")
print(f"{csv_path.relative_to(UFO_ROOT)} ({len(rows)} rows × {len(headers)} cols, {elapsed:.1f}s)", flush=True)
# Update table.md frontmatter
fm["csv_path"] = str(csv_path.relative_to(UFO_ROOT))
fm["json_path"] = str(json_path.relative_to(UFO_ROOT))
fm["headers"] = headers
fm["row_count_extracted"] = parsed.get("row_count", len(rows))
fm["column_count_extracted"] = parsed.get("column_count", len(headers))
fm["extraction_quality"] = parsed.get("extraction_quality")
fm["extraction_notes"] = parsed.get("notes", "")
fm["extraction_model"] = "claude-haiku-4-5"
fm["extracted_at"] = utc_now_iso()
fm["last_ingest"] = utc_now_iso()
body = render_table_md_body(table_id, fm, parsed)
write_md(md_path, fm, body)
return True
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--table-id", help="single table")
ap.add_argument("--force", action="store_true")
args = ap.parse_args()
CSV_BASE.mkdir(parents=True, exist_ok=True)
CROPS_BASE.mkdir(parents=True, exist_ok=True)
if args.table_id:
targets = [TABLES_BASE / f"{args.table_id}.md"]
else:
targets = sorted(TABLES_BASE.glob("*.md"))
print(f"Processing {len(targets)} table(s)…")
extracted = 0
for t in targets:
if not t.exists():
sys.stderr.write(f" ✗ no table.md: {t}\n")
continue
if process_table(t, args.force):
extracted += 1
print(f"\nExtracted: {extracted} table(s)")
if extracted > 0:
with open(LOG_PATH, "a", encoding="utf-8") as fh:
fh.write(
f"\n## {utc_now_iso()} — EXTRACT TABLE CSV\n"
f"- operator: archivist + evidence-officer\n- script: scripts/16-extract-table-csv.py\n"
f"- tables_extracted: {extracted}\n"
)
if __name__ == "__main__":
main()