#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Rebuilder for doc-38-143685-box7-incident-summaries-101-172 Uses Gemini 2.0 Flash for vision processing. 143 pages (p-000..p-063, p-100..p-178). """ import base64 import json import os import re import sys import time import datetime import concurrent.futures from pathlib import Path from PIL import Image # Suppress Google auth FutureWarnings import warnings warnings.filterwarnings("ignore", category=FutureWarning) from google import genai from google.genai import types DOC_ID = "doc-38-143685-box7-incident-summaries-101-172" DOC_TITLE = "USAF UFO/UAP Incident Summary Sheets — Box 7 (Incidents 101-172)" PNG_DIR = Path(f"/Users/guto/ufo/processing/png/{DOC_ID}") RAW_DIR = Path(f"/Users/guto/ufo/raw/{DOC_ID}") CHUNKS_DIR = RAW_DIR / "chunks" IMAGES_DIR = RAW_DIR / "images" TABLES_DIR = RAW_DIR / "tables" GEMINI_API_KEY = os.environ.get("GOOGLE_API_KEY") or os.environ.get("GEMINI_API_KEY") GEMINI_MODEL = "gemini-2.0-flash" CALL_TIMEOUT = 120 # seconds per Gemini call for d in [CHUNKS_DIR, IMAGES_DIR, TABLES_DIR]: d.mkdir(parents=True, exist_ok=True) def get_page_files(): pages = [] for f in sorted(PNG_DIR.glob("p-*.png")): num = int(f.stem.split("-")[1]) pages.append(num) return sorted(pages) PAGE_NUMS = get_page_files() TOTAL_PAGES = len(PAGE_NUMS) # ── Gemini client (one per thread via local) ──────────────────────────────── def make_client(): return genai.Client(api_key=GEMINI_API_KEY) # ── Prompt ─────────────────────────────────────────────────────────────────── def build_page_prompt(page_file: str, page_number: int) -> str: return ( "You are a page-rebuilder for a UAP/UFO document digitization project.\n" "Analyze this scanned page from a declassified USAF UFO incident summary document.\n\n" f"- Document: USAF UFO/UAP Incident Summary Sheets, Box 7 (Incidents 101-172)\n" f"- Page file: {page_file} | Sequential page: {page_number} of {TOTAL_PAGES}\n\n" "Chunk types (use ONLY these):\n" " letterhead, classification_banner, form_header, field_entry, paragraph_text,\n" " redaction, table_marker, image, caption, page_number, signature_block,\n" " handwritten_note, stamp, blank, separator\n\n" "Rules:\n" "- Each numbered form field (1. Date, 2. Time, etc.) = one field_entry chunk\n" " EXCEPTION: you may group 2-3 very short consecutive fields into one chunk\n" " to stay under token limits, e.g. '1. Date: 30 Jun 1948 | 2. Time: 2140'\n" "- classification markings = classification_banner\n" "- form title/header line = form_header\n" "- stamps (RESTRICTED, DECLASSIFIED, SECRET, etc.) = stamp\n" "- photos/sketches/diagrams = image\n" "- handwritten annotations = handwritten_note\n" "- page number printed = page_number\n" "- near-blank pages = one blank chunk\n\n" "For content_en: verbatim transcription (English).\n" "For content_pt_br: Brazilian Portuguese translation; keep proper nouns/dates verbatim.\n" "For blank pages: content_en='[BLANK PAGE]', content_pt_br='[PAGINA EM BRANCO]'.\n" "For stamps: transcribe exact text seen.\n\n" "bbox: fractions of page width/height, e.g. {\"x\":0.05,\"y\":0.10,\"w\":0.90,\"h\":0.05}\n\n" "RETURN ONLY valid JSON, no markdown fences, no extra text:\n" "{\n" " \"page_number\": ,\n" " \"page_file\": \"\",\n" " \"chunks\": [\n" " {\n" " \"type\": \"field_entry\",\n" " \"order_in_page\": 1,\n" " \"content_en\": \"1. Date: 30 June 1948\",\n" " \"content_pt_br\": \"1. Data: 30 de junho de 1948\",\n" " \"bbox\": {\"x\":0.05,\"y\":0.10,\"w\":0.90,\"h\":0.05},\n" " \"classification\": null,\n" " \"formatting\": [],\n" " \"cross_page_hint\": \"self_contained\",\n" " \"ocr_confidence\": 0.90,\n" " \"ocr_source_lines\": [3,4],\n" " \"redaction_code\": null,\n" " \"redaction_inferred_content_type\": null,\n" " \"image_type\": null,\n" " \"ufo_anomaly_detected\": false,\n" " \"cryptid_anomaly_detected\": false\n" " }\n" " ]\n" "}\n" ) def build_image_prompt() -> str: return ( "You are an image analyst for a UAP/UFO document digitization project.\n" "Analyze this cropped region from a declassified USAF document.\n\n" "RETURN ONLY valid JSON (no markdown fences):\n" "{\n" " \"image_description_en\": \"...\",\n" " \"image_description_pt_br\": \"...\",\n" " \"image_type\": \"photograph|diagram|sketch|map|chart|stamp_graphic|form_field|text_block\",\n" " \"extracted_text\": \"verbatim text or null\",\n" " \"ufo_anomaly_detected\": false,\n" " \"ufo_anomaly_type\": null,\n" " \"ufo_anomaly_rationale\": null,\n" " \"cryptid_anomaly_detected\": false,\n" " \"cryptid_anomaly_type\": null,\n" " \"cryptid_anomaly_rationale\": null\n" "}\n\n" "ufo_anomaly_detected=true ONLY if image shows actual UAP/UFO visual evidence.\n" "cryptid_anomaly_detected=true ONLY if image shows unknown creature evidence.\n" ) # ── API call with timeout ──────────────────────────────────────────────────── def gemini_call(img_bytes: bytes, prompt: str) -> str: """Call Gemini with image + text prompt. Returns response text.""" client = make_client() def _call(): response = client.models.generate_content( model=GEMINI_MODEL, contents=[ types.Part.from_bytes(data=img_bytes, mime_type="image/png"), prompt, ], ) return response.text with concurrent.futures.ThreadPoolExecutor(max_workers=1) as ex: fut = ex.submit(_call) return fut.result(timeout=CALL_TIMEOUT) def parse_json_response(raw: str) -> dict: """Strip fences and parse JSON.""" raw = raw.strip() if raw.startswith("```"): raw = re.sub(r"^```[a-z]*\n?", "", raw) raw = re.sub(r"\n?```$", "", raw.rstrip()) return json.loads(raw) # ── Page processing ────────────────────────────────────────────────────────── def process_page(task: tuple) -> dict: file_num, seq_idx = task page_file = f"p-{file_num:03d}" png_path = PNG_DIR / f"{page_file}.png" prompt = build_page_prompt(page_file, seq_idx) with open(png_path, "rb") as f: img_bytes = f.read() max_retries = 3 for attempt in range(max_retries): try: raw = gemini_call(img_bytes, prompt) result = parse_json_response(raw) result["_file_num"] = file_num result["_seq_idx"] = seq_idx result["page_file"] = page_file chunk_count = len(result.get("chunks", [])) print(f" [OK] page {seq_idx:3d}/{TOTAL_PAGES} ({page_file}) — {chunk_count} chunks", flush=True) return result except json.JSONDecodeError as e: print(f" [WARN] page {seq_idx} JSON error (attempt {attempt+1}): {e}", flush=True) if attempt == max_retries - 1: return _fallback_page(file_num, seq_idx, page_file, f"JSON: {e}") time.sleep(3) except concurrent.futures.TimeoutError: print(f" [TIMEOUT] page {seq_idx} (attempt {attempt+1})", flush=True) if attempt == max_retries - 1: return _fallback_page(file_num, seq_idx, page_file, "TIMEOUT") time.sleep(5) except Exception as e: msg = str(e)[:100] print(f" [ERR] page {seq_idx} (attempt {attempt+1}): {msg}", flush=True) if attempt == max_retries - 1: return _fallback_page(file_num, seq_idx, page_file, msg) time.sleep(5) def _fallback_page(file_num, seq_idx, page_file, reason): return { "page_number": seq_idx, "page_file": page_file, "_file_num": file_num, "_seq_idx": seq_idx, "chunks": [{ "type": "blank", "order_in_page": 1, "content_en": f"[PAGE {seq_idx} ERROR: {reason}]", "content_pt_br": f"[PAGINA {seq_idx} ERRO: {reason}]", "bbox": {"x": 0.0, "y": 0.0, "w": 1.0, "h": 1.0}, "classification": None, "formatting": [], "cross_page_hint": "self_contained", "ocr_confidence": 0.0, "ocr_source_lines": [], "redaction_code": None, "redaction_inferred_content_type": None, "image_type": None, "ufo_anomaly_detected": False, "cryptid_anomaly_detected": False, }], } def process_pages_parallel(batch_size: int = 4) -> list: tasks = [(file_num, idx + 1) for idx, file_num in enumerate(PAGE_NUMS)] results = [] total_batches = (len(tasks) + batch_size - 1) // batch_size print(f"Processing {TOTAL_PAGES} pages in {total_batches} batches of {batch_size}...", flush=True) for b_start in range(0, len(tasks), batch_size): batch = tasks[b_start:b_start + batch_size] b_num = b_start // batch_size + 1 print(f" Batch {b_num}/{total_batches}: pages {batch[0][1]}–{batch[-1][1]}", flush=True) with concurrent.futures.ThreadPoolExecutor(max_workers=batch_size) as ex: futs = {ex.submit(process_page, t): t for t in batch} for fut in concurrent.futures.as_completed(futs): results.append(fut.result()) if b_start + batch_size < len(tasks): time.sleep(0.5) results.sort(key=lambda r: r["_seq_idx"]) return results # ── Chunk numbering ────────────────────────────────────────────────────────── def assign_global_chunk_ids(page_results: list) -> list: global_order = 0 all_chunks = [] for pr in page_results: seq_idx = pr["_seq_idx"] file_num = pr["_file_num"] page_file = pr.get("page_file", f"p-{file_num:03d}") chunks = sorted(pr.get("chunks", []), key=lambda c: c.get("order_in_page", 0)) for chunk in chunks: global_order += 1 chunk_id = f"c{global_order:04d}" chunk["chunk_id"] = chunk_id chunk["order_global"] = global_order chunk["page"] = seq_idx chunk["page_file"] = page_file chunk["_file_num"] = file_num chunk["prev_chunk"] = f"c{global_order-1:04d}" if global_order > 1 else None chunk["next_chunk"] = None all_chunks.append(chunk) for i in range(len(all_chunks) - 1): all_chunks[i]["next_chunk"] = all_chunks[i + 1]["chunk_id"] return all_chunks # ── Image crop ─────────────────────────────────────────────────────────────── def crop_image_chunk(chunk: dict): chunk_id = chunk["chunk_id"] file_num = chunk["_file_num"] bbox = chunk.get("bbox") or {} if not isinstance(bbox, dict): bbox = {} png_path = PNG_DIR / f"p-{file_num:03d}.png" out_path = IMAGES_DIR / f"IMG-{chunk_id}.png" try: im = Image.open(png_path) W, H = im.size x = float(bbox.get("x", 0.0)) y = float(bbox.get("y", 0.0)) w = float(bbox.get("w", 1.0)) h = float(bbox.get("h", 1.0)) pad = 0.005 left = max(0, int((x - pad) * W)) top = max(0, int((y - pad) * H)) right = min(W, int((x + w + pad) * W)) bottom = min(H, int((y + h + pad) * H)) im.crop((left, top, right, bottom)).save(out_path) return str(out_path) except Exception as e: print(f" [WARN] crop {chunk_id}: {e}", flush=True) return None # ── Image analysis ─────────────────────────────────────────────────────────── def analyze_image_chunk(chunk: dict): chunk_id = chunk["chunk_id"] img_path = IMAGES_DIR / f"IMG-{chunk_id}.png" if not img_path.exists(): return try: with open(img_path, "rb") as f: img_bytes = f.read() raw = gemini_call(img_bytes, build_image_prompt()) analysis = parse_json_response(raw) for key in ["image_description_en", "image_description_pt_br", "image_type", "extracted_text", "ufo_anomaly_detected", "ufo_anomaly_type", "ufo_anomaly_rationale", "cryptid_anomaly_detected", "cryptid_anomaly_type", "cryptid_anomaly_rationale"]: if key in analysis: chunk[key] = analysis[key] ufo = chunk.get("ufo_anomaly_detected", False) print(f" [IMG] {chunk_id} — ufo={ufo}", flush=True) except Exception as e: print(f" [WARN] img analysis {chunk_id}: {e}", flush=True) # ── YAML helper ────────────────────────────────────────────────────────────── def yaml_val(v) -> str: if v is None: return "null" if isinstance(v, bool): return "true" if v else "false" if isinstance(v, (int, float)): return str(v) if isinstance(v, list): if not v: return "[]" return "[" + ", ".join(yaml_val(i) for i in v) + "]" s = str(v) if any(c in s for c in [':', '#', '"', "'", '\n', '{', '}']): return '"' + s.replace('\\', '\\\\').replace('"', '\\"') + '"' return s # ── Write chunk file ───────────────────────────────────────────────────────── def write_chunk_file(chunk: dict): chunk_id = chunk["chunk_id"] page = chunk["page"] page_file = chunk.get("page_file", "p-000") ctype = chunk.get("type", "blank") bbox = chunk.get("bbox") or {} if not isinstance(bbox, dict): bbox = {} bx = float(bbox.get("x", 0.0)) by = float(bbox.get("y", 0.0)) bw = float(bbox.get("w", 1.0)) bh = float(bbox.get("h", 1.0)) related_image = f"IMG-{chunk_id}.png" if ctype == "image" else "null" related_table = yaml_val(chunk.get("related_table")) lines = [ "---", f"chunk_id: {chunk_id}", f"type: {ctype}", f"page: {page}", f"order_in_page: {chunk.get('order_in_page', 1)}", f"order_global: {chunk.get('order_global', 1)}", f"bbox: {{x: {bx:.3f}, y: {by:.3f}, w: {bw:.3f}, h: {bh:.3f}}}", f"classification: {yaml_val(chunk.get('classification'))}", f"formatting: {yaml_val(chunk.get('formatting', []))}", f"cross_page_hint: {chunk.get('cross_page_hint', 'self_contained')}", f"prev_chunk: {yaml_val(chunk.get('prev_chunk'))}", f"next_chunk: {yaml_val(chunk.get('next_chunk'))}", f"related_image: {related_image}", f"related_table: {related_table}", f"ocr_confidence: {float(chunk.get('ocr_confidence') or 0.85):.2f}", f"ocr_source_lines: {yaml_val(chunk.get('ocr_source_lines', []))}", f"redaction_code: {yaml_val(chunk.get('redaction_code'))}", f"redaction_inferred_content_type: {yaml_val(chunk.get('redaction_inferred_content_type'))}", f"image_type: {yaml_val(chunk.get('image_type'))}", f"ufo_anomaly_detected: {yaml_val(chunk.get('ufo_anomaly_detected', False))}", f"cryptid_anomaly_detected: {yaml_val(chunk.get('cryptid_anomaly_detected', False))}", f"ufo_anomaly_type: {yaml_val(chunk.get('ufo_anomaly_type'))}", f"ufo_anomaly_rationale: {yaml_val(chunk.get('ufo_anomaly_rationale'))}", f"cryptid_anomaly_type: {yaml_val(chunk.get('cryptid_anomaly_type'))}", f"cryptid_anomaly_rationale: {yaml_val(chunk.get('cryptid_anomaly_rationale'))}", f"image_description_en: {yaml_val(chunk.get('image_description_en'))}", f"image_description_pt_br: {yaml_val(chunk.get('image_description_pt_br'))}", f"extracted_text: {yaml_val(chunk.get('extracted_text'))}", f"source_png: ../../processing/png/{DOC_ID}/{page_file}.png", "---", "", f"**EN:** {chunk.get('content_en') or ''}", "", f"**PT-BR:** {chunk.get('content_pt_br') or ''}", "", ] if ctype == "image": lines += [ f"![{chunk_id} image](../images/IMG-{chunk_id}.png)", "", ] if chunk.get("image_description_en"): lines += [f"*{chunk['image_description_en']}*", ""] (CHUNKS_DIR / f"{chunk_id}.md").write_text("\n".join(lines), encoding="utf-8") # ── Write _index.json ──────────────────────────────────────────────────────── def write_index_json(all_chunks: list, build_at: str): index = { "doc_id": DOC_ID, "schema_version": "0.2.0", "total_pages": TOTAL_PAGES, "total_chunks": len(all_chunks), "build_approach": "subagents", "build_model": GEMINI_MODEL, "build_at": build_at, "chunks": [], } for chunk in all_chunks: cid = chunk["chunk_id"] content_en = chunk.get("content_en") or "" preview = content_en[:80] + ("..." if len(content_en) > 80 else "") bbox = chunk.get("bbox") or {"x": 0.0, "y": 0.0, "w": 1.0, "h": 1.0} if not isinstance(bbox, dict): bbox = {"x": 0.0, "y": 0.0, "w": 1.0, "h": 1.0} index["chunks"].append({ "chunk_id": cid, "type": chunk.get("type", "blank"), "page": chunk["page"], "order_in_page": chunk.get("order_in_page", 1), "order_global": chunk.get("order_global", 1), "file": f"chunks/{cid}.md", "bbox": bbox, "preview": preview, }) out = RAW_DIR / "_index.json" out.write_text(json.dumps(index, indent=2, ensure_ascii=False), encoding="utf-8") print(f" Written: {out}", flush=True) # ── Write document.md ──────────────────────────────────────────────────────── def write_document_md(all_chunks: list, build_at: str) -> int: type_hist: dict = {} ufo_flagged = [] cryptid_flagged = [] for chunk in all_chunks: t = chunk.get("type", "blank") type_hist[t] = type_hist.get(t, 0) + 1 if chunk.get("ufo_anomaly_detected"): ufo_flagged.append(chunk["chunk_id"]) if chunk.get("cryptid_anomaly_detected"): cryptid_flagged.append(chunk["chunk_id"]) hist = "\n".join(f" {k}: {v}" for k, v in sorted(type_hist.items())) header = ( "---\n" 'schema_version: "0.2.0"\n' "type: master_document\n" f"doc_id: {DOC_ID}\n" f'canonical_title: "{DOC_TITLE}"\n' f"total_pages: {TOTAL_PAGES}\n" f"total_chunks: {len(all_chunks)}\n" "chunk_types_histogram:\n" f"{hist}\n" "multi_page_tables: []\n" f"ufo_anomalies_flagged: [{', '.join(ufo_flagged)}]\n" f"cryptid_anomalies_flagged: [{', '.join(cryptid_flagged)}]\n" 'build_approach: "subagents"\n' f"build_model: {GEMINI_MODEL}\n" f"build_at: {build_at}\n" "---\n\n" f"# {DOC_TITLE}\n\n" ) pages_dict: dict = {} for chunk in all_chunks: p = chunk["page"] pages_dict.setdefault(p, []).append(chunk) body_parts = [] for page_num in sorted(pages_dict): body_parts.append(f"## Page {page_num}\n\n") for chunk in sorted(pages_dict[page_num], key=lambda c: c.get("order_in_page", 0)): cid = chunk["chunk_id"] ctype = chunk.get("type", "blank") bbox = chunk.get("bbox") or {"x": 0.0, "y": 0.0, "w": 1.0, "h": 1.0} if not isinstance(bbox, dict): bbox = {"x": 0.0, "y": 0.0, "w": 1.0, "h": 1.0} bs = f"{float(bbox.get('x',0)):.2f}/{float(bbox.get('y',0)):.2f}/{float(bbox.get('w',1)):.2f}/{float(bbox.get('h',1)):.2f}" section = [ f"", f'', f"### Chunk {cid} — {ctype} · p{page_num} · bbox: {bs}", "", f"**EN:** {chunk.get('content_en') or ''}", "", f"**PT-BR:** {chunk.get('content_pt_br') or ''}", "", ] if ctype == "image": section += [f"![{cid} image](./images/IMG-{cid}.png)", ""] if chunk.get("image_description_en"): section += [f"*EN: {chunk['image_description_en']}*", ""] if chunk.get("image_description_pt_br"): section += [f"*PT-BR: {chunk['image_description_pt_br']}*", ""] meta = {k: v for k, v in chunk.items() if not k.startswith("_") and k not in ("content_en", "content_pt_br")} section += [ "
metadata", "", "```json", json.dumps(meta, indent=2, ensure_ascii=False), "```", "", "
", "", "---", "", ] body_parts.append("\n".join(section)) out = RAW_DIR / "document.md" out.write_text(header + "".join(body_parts), encoding="utf-8") size = out.stat().st_size print(f" Written: {out} ({size} bytes)", flush=True) return size # ── Main ───────────────────────────────────────────────────────────────────── CHECKPOINT_FILE = RAW_DIR / "_checkpoint_pages.json" def main(): t0 = time.time() build_at = datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ") print(f"=== Rebuilding {DOC_ID} ===", flush=True) print(f"Total pages: {TOTAL_PAGES} | Gemini model: {GEMINI_MODEL}", flush=True) print() # Step 1 — process pages (with checkpoint support) if CHECKPOINT_FILE.exists(): print("STEP 1: Loading from checkpoint...", flush=True) page_results = json.loads(CHECKPOINT_FILE.read_text(encoding="utf-8")) print(f" Loaded {len(page_results)} pages from checkpoint.", flush=True) else: print("STEP 1: Processing pages...", flush=True) page_results = process_pages_parallel(batch_size=4) # Save checkpoint CHECKPOINT_FILE.write_text(json.dumps(page_results, ensure_ascii=False), encoding="utf-8") print(f" Done. {len(page_results)} pages processed. Checkpoint saved.", flush=True) print() # Step 2 — assign chunk IDs print("STEP 2: Assigning chunk IDs...", flush=True) all_chunks = assign_global_chunk_ids(page_results) print(f" Total chunks: {len(all_chunks)}", flush=True) print() # Step 3 — crop images image_chunks = [c for c in all_chunks if c.get("type") == "image"] print(f"STEP 3: Cropping {len(image_chunks)} image chunks...", flush=True) for chunk in image_chunks: crop_image_chunk(chunk) print() # Step 4 — analyze images in batches of 4 print(f"STEP 4: Analyzing {len(image_chunks)} images...", flush=True) for b in range(0, len(image_chunks), 4): batch = image_chunks[b:b + 4] with concurrent.futures.ThreadPoolExecutor(max_workers=4) as ex: list(ex.map(analyze_image_chunk, batch)) if b + 4 < len(image_chunks): time.sleep(0.5) print() # Step 5 — write chunk files print("STEP 5: Writing chunk files...", flush=True) for chunk in all_chunks: write_chunk_file(chunk) print(f" Written {len(all_chunks)} chunk files.", flush=True) print() # Step 6 — write index print("STEP 6: Writing _index.json...", flush=True) write_index_json(all_chunks, build_at) print() # Step 7 — write document.md print("STEP 7: Writing document.md...", flush=True) doc_bytes = write_document_md(all_chunks, build_at) print() wall = int(time.time() - t0) num_images = len(image_chunks) num_ufo = len([c for c in all_chunks if c.get("ufo_anomaly_detected")]) num_cryptid = len([c for c in all_chunks if c.get("cryptid_anomaly_detected")]) print("=== DONE ===", flush=True) print( f"pages_done={TOTAL_PAGES}, chunks_total={len(all_chunks)}, " f"images_extracted={num_images}, tables_stitched=0, " f"ufo_anomalies={num_ufo}, cryptid_anomalies={num_cryptid}, " f"wall_seconds={wall}", flush=True, ) if __name__ == "__main__": main()