#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Rebuilder for doc-65-hs1-834228961-62-hq-83894-section-6 Uses Gemini 2.0 Flash for vision processing. 236 pages (p-000..p-063, p-100..p-271 with gap p-064..p-099). """ import json import os import re import time import datetime import concurrent.futures from pathlib import Path from PIL import Image import warnings warnings.filterwarnings("ignore", category=FutureWarning) from google import genai from google.genai import types DOC_ID = "doc-65-hs1-834228961-62-hq-83894-section-6" DOC_TITLE = "65 HS1-834228961 62-HQ-83894 Section 6 (FBI UAP/UFO Investigative File)" PNG_DIR = Path(f"/Users/guto/ufo/processing/png/{DOC_ID}") RAW_DIR = Path(f"/Users/guto/ufo/raw/{DOC_ID}") CHUNKS_DIR = RAW_DIR / "chunks" IMAGES_DIR = RAW_DIR / "images" TABLES_DIR = RAW_DIR / "tables" GEMINI_API_KEY = os.environ.get("GOOGLE_API_KEY") or os.environ.get("GEMINI_API_KEY") GEMINI_MODEL = "gemini-2.0-flash" CALL_TIMEOUT = 180 BATCH_SIZE = 4 MAX_OUTPUT_TOKENS = 8192 for d in [CHUNKS_DIR, IMAGES_DIR, TABLES_DIR]: d.mkdir(parents=True, exist_ok=True) def get_page_files(): pages = [] for f in sorted(PNG_DIR.glob("p-*.png")): num = int(f.stem.split("-")[1]) pages.append(num) return sorted(pages) PAGE_NUMS = get_page_files() TOTAL_PAGES = len(PAGE_NUMS) def make_client(): return genai.Client(api_key=GEMINI_API_KEY) # Compact prompt: omit null fields from JSON template to reduce token waste def build_page_prompt(page_file: str, page_number: int) -> str: return ( "You are a page-rebuilder for a UAP/UFO document digitization project.\n" "Analyze this scanned page from a declassified FBI UAP/UFO investigative document.\n\n" f"Document: {DOC_TITLE}\n" f"Page {page_number} of {TOTAL_PAGES} ({page_file})\n\n" "CHUNK TYPES (ONLY these): letterhead, classification_banner, header, subheader,\n" "paragraph, list_item, caption, footnote, page_number, signature_block, stamp,\n" "redaction_block, image, table_marker, form_field, blank, handwritten_note, section_title\n\n" "RULES:\n" "- One chunk per distinct visual element, ordered top-to-bottom\n" "- content_en: verbatim text (English) or description for non-text\n" "- content_pt_br: Brazilian Portuguese (pt-br) translation; keep proper nouns/codes\n" "- bbox: {x,y,w,h} as fractions 0.0-1.0 of page size\n" "- Redacted blocks: type=redaction_block, content_en=[REDACTED]\n" "- Images/photos/diagrams: type=image\n" "- Blank/near-blank pages: ONE chunk type=blank\n" "- IMPORTANT: If page has many elements, group related paragraphs to stay under token limit\n\n" "RETURN ONLY valid JSON:\n" "{\"page_number\":,\"page_file\":\"p-NNN\",\"chunks\":[\n" "{\"type\":\"paragraph\",\"order_in_page\":1,\n" "\"content_en\":\"...\",\"content_pt_br\":\"...\",\n" "\"bbox\":{\"x\":0.05,\"y\":0.10,\"w\":0.90,\"h\":0.05},\n" "\"classification\":null,\"formatting\":[],\n" "\"cross_page_hint\":\"self_contained\",\n" "\"ocr_confidence\":0.85,\"ocr_source_lines\":[],\n" "\"redaction_code\":null,\"redaction_inferred_content_type\":null,\n" "\"image_type\":null,\n" "\"ufo_anomaly_detected\":false,\"ufo_anomaly_type\":null,\"ufo_anomaly_rationale\":null,\n" "\"cryptid_anomaly_detected\":false,\"cryptid_anomaly_type\":null,\"cryptid_anomaly_rationale\":null,\n" "\"image_description_en\":null,\"image_description_pt_br\":null,\"extracted_text\":null}\n" "]}" ) def build_image_prompt() -> str: return ( "Analyze this cropped region from a declassified FBI document.\n" "RETURN ONLY valid JSON:\n" "{\"image_description_en\":\"...\",\"image_description_pt_br\":\"...\"," "\"image_type\":\"photograph|diagram|sketch|map|chart|stamp_graphic|seal|signature|other\"," "\"extracted_text\":null," "\"ufo_anomaly_detected\":false,\"ufo_anomaly_type\":null,\"ufo_anomaly_rationale\":null," "\"cryptid_anomaly_detected\":false,\"cryptid_anomaly_type\":null,\"cryptid_anomaly_rationale\":null}" ) def gemini_call(img_bytes: bytes, prompt: str) -> str: client = make_client() def _call(): response = client.models.generate_content( model=GEMINI_MODEL, contents=[ types.Part.from_bytes(data=img_bytes, mime_type="image/png"), prompt, ], config=types.GenerateContentConfig( max_output_tokens=MAX_OUTPUT_TOKENS, temperature=0.1, ), ) return response.text with concurrent.futures.ThreadPoolExecutor(max_workers=1) as ex: fut = ex.submit(_call) return fut.result(timeout=CALL_TIMEOUT) def try_repair_truncated_json(raw, page_number, page_file): """Try to extract complete chunks from truncated JSON by finding complete objects.""" try: # Find all complete chunk objects using regex on the chunks array # A complete chunk has matching braces chunks_match = re.search(r'"chunks"\s*:\s*\[(.+)', raw, re.DOTALL) if not chunks_match: return None chunks_text = chunks_match.group(1) chunks = [] depth = 0 start = -1 for i, c in enumerate(chunks_text): if c == '{': if depth == 0: start = i depth += 1 elif c == '}': depth -= 1 if depth == 0 and start >= 0: chunk_str = chunks_text[start:i + 1] try: chunk = json.loads(chunk_str) chunks.append(chunk) except json.JSONDecodeError: pass start = -1 if chunks: print(f" [REPAIR] Extracted {len(chunks)} complete chunks from truncated response", flush=True) return { "page_number": page_number, "page_file": page_file, "chunks": chunks, } except Exception: pass return None def parse_json_response(raw: str, page_number: int = 0, page_file: str = "") -> dict: raw = raw.strip() if raw.startswith("```"): raw = re.sub(r"^```[a-z]*\n?", "", raw) raw = re.sub(r"\n?```$", "", raw.rstrip()) try: return json.loads(raw) except json.JSONDecodeError: # Try repair on truncated response repaired = try_repair_truncated_json(raw, page_number, page_file) if repaired and repaired.get("chunks"): return repaired raise def process_page(task: tuple) -> dict: file_num, seq_idx = task page_file = f"p-{file_num:03d}" png_path = PNG_DIR / f"{page_file}.png" prompt = build_page_prompt(page_file, seq_idx) with open(png_path, "rb") as f: img_bytes = f.read() max_retries = 3 for attempt in range(max_retries): try: raw = gemini_call(img_bytes, prompt) result = parse_json_response(raw, seq_idx, page_file) result["_file_num"] = file_num result["_seq_idx"] = seq_idx result["page_file"] = page_file chunk_count = len(result.get("chunks", [])) print(f" [OK] page {seq_idx:3d}/{TOTAL_PAGES} ({page_file}) — {chunk_count} chunks", flush=True) return result except json.JSONDecodeError as e: print(f" [WARN] page {seq_idx} JSON error (attempt {attempt+1}): {e}", flush=True) if attempt == max_retries - 1: return _fallback_page(file_num, seq_idx, page_file, f"JSON: {e}") time.sleep(3) except concurrent.futures.TimeoutError: print(f" [TIMEOUT] page {seq_idx} (attempt {attempt+1})", flush=True) if attempt == max_retries - 1: return _fallback_page(file_num, seq_idx, page_file, "TIMEOUT") time.sleep(5) except Exception as e: msg = str(e)[:100] print(f" [ERR] page {seq_idx} (attempt {attempt+1}): {msg}", flush=True) if attempt == max_retries - 1: return _fallback_page(file_num, seq_idx, page_file, msg) time.sleep(5) def _fallback_page(file_num, seq_idx, page_file, reason): return { "page_number": seq_idx, "page_file": page_file, "_file_num": file_num, "_seq_idx": seq_idx, "chunks": [{ "type": "blank", "order_in_page": 1, "content_en": f"[PAGE {seq_idx} ERROR: {reason}]", "content_pt_br": f"[PAGINA {seq_idx} ERRO: {reason}]", "bbox": {"x": 0.0, "y": 0.0, "w": 1.0, "h": 1.0}, "classification": None, "formatting": [], "cross_page_hint": "self_contained", "ocr_confidence": 0.0, "ocr_source_lines": [], "redaction_code": None, "redaction_inferred_content_type": None, "image_type": None, "ufo_anomaly_detected": False, "ufo_anomaly_type": None, "ufo_anomaly_rationale": None, "cryptid_anomaly_detected": False, "cryptid_anomaly_type": None, "cryptid_anomaly_rationale": None, "image_description_en": None, "image_description_pt_br": None, "extracted_text": None, }], } def process_pages_parallel(batch_size: int = BATCH_SIZE) -> list: tasks = [(file_num, idx + 1) for idx, file_num in enumerate(PAGE_NUMS)] results = [] total_batches = (len(tasks) + batch_size - 1) // batch_size print(f"Processing {TOTAL_PAGES} pages in {total_batches} batches of {batch_size}...", flush=True) for b_start in range(0, len(tasks), batch_size): batch = tasks[b_start:b_start + batch_size] b_num = b_start // batch_size + 1 print(f" Batch {b_num}/{total_batches}: pages {batch[0][1]}-{batch[-1][1]}", flush=True) with concurrent.futures.ThreadPoolExecutor(max_workers=batch_size) as ex: futs = {ex.submit(process_page, t): t for t in batch} for fut in concurrent.futures.as_completed(futs): results.append(fut.result()) if b_start + batch_size < len(tasks): time.sleep(0.5) results.sort(key=lambda r: r["_seq_idx"]) return results def assign_global_chunk_ids(page_results: list) -> list: global_order = 0 all_chunks = [] for pr in page_results: seq_idx = pr["_seq_idx"] file_num = pr["_file_num"] page_file = pr.get("page_file", f"p-{file_num:03d}") chunks = sorted(pr.get("chunks", []), key=lambda c: c.get("order_in_page", 0)) for chunk in chunks: global_order += 1 chunk_id = f"c{global_order:04d}" chunk["chunk_id"] = chunk_id chunk["order_global"] = global_order chunk["page"] = seq_idx chunk["page_file"] = page_file chunk["_file_num"] = file_num chunk["prev_chunk"] = f"c{global_order-1:04d}" if global_order > 1 else None chunk["next_chunk"] = None chunk.setdefault("related_table", None) all_chunks.append(chunk) for i in range(len(all_chunks) - 1): all_chunks[i]["next_chunk"] = all_chunks[i + 1]["chunk_id"] return all_chunks def crop_image_chunk(chunk: dict): chunk_id = chunk["chunk_id"] file_num = chunk["_file_num"] bbox = chunk.get("bbox") or {} if not isinstance(bbox, dict): bbox = {} png_path = PNG_DIR / f"p-{file_num:03d}.png" out_path = IMAGES_DIR / f"IMG-{chunk_id}.png" try: im = Image.open(png_path) W, H = im.size x = float(bbox.get("x", 0.0)) y = float(bbox.get("y", 0.0)) w = float(bbox.get("w", 1.0)) h = float(bbox.get("h", 1.0)) pad = 0.005 left = max(0, int((x - pad) * W)) top = max(0, int((y - pad) * H)) right = min(W, int((x + w + pad) * W)) bottom = min(H, int((y + h + pad) * H)) if right - left < 5: right = min(W, left + 50) if bottom - top < 5: bottom = min(H, top + 50) im.crop((left, top, right, bottom)).save(out_path) return str(out_path) except Exception as e: print(f" [WARN] crop {chunk_id}: {e}", flush=True) return None def analyze_image_chunk(chunk: dict): chunk_id = chunk["chunk_id"] img_path = IMAGES_DIR / f"IMG-{chunk_id}.png" if not img_path.exists(): return try: with open(img_path, "rb") as f: img_bytes = f.read() raw = gemini_call(img_bytes, build_image_prompt()) analysis = parse_json_response(raw) for key in ["image_description_en", "image_description_pt_br", "image_type", "extracted_text", "ufo_anomaly_detected", "ufo_anomaly_type", "ufo_anomaly_rationale", "cryptid_anomaly_detected", "cryptid_anomaly_type", "cryptid_anomaly_rationale"]: if key in analysis: chunk[key] = analysis[key] ufo = chunk.get("ufo_anomaly_detected", False) print(f" [IMG] {chunk_id} — ufo={ufo}", flush=True) except Exception as e: print(f" [WARN] img analysis {chunk_id}: {e}", flush=True) def yaml_val(v) -> str: if v is None: return "null" if isinstance(v, bool): return "true" if v else "false" if isinstance(v, (int, float)): return str(v) if isinstance(v, list): if not v: return "[]" return "[" + ", ".join(yaml_val(i) for i in v) + "]" s = str(v) if any(c in s for c in [':', '#', '"', "'", '\n', '{', '}']): return '"' + s.replace('\\', '\\\\').replace('"', '\\"') + '"' return s def write_chunk_file(chunk: dict): chunk_id = chunk["chunk_id"] page = chunk["page"] page_file = chunk.get("page_file", "p-000") ctype = chunk.get("type", "blank") bbox = chunk.get("bbox") or {} if not isinstance(bbox, dict): bbox = {} bx = float(bbox.get("x", 0.0)) by = float(bbox.get("y", 0.0)) bw = float(bbox.get("w", 1.0)) bh = float(bbox.get("h", 1.0)) related_image = f"IMG-{chunk_id}.png" if ctype == "image" else "null" related_table = yaml_val(chunk.get("related_table")) lines = [ "---", f"chunk_id: {chunk_id}", f"type: {ctype}", f"page: {page}", f"order_in_page: {chunk.get('order_in_page', 1)}", f"order_global: {chunk.get('order_global', 1)}", f"bbox: {{x: {bx:.3f}, y: {by:.3f}, w: {bw:.3f}, h: {bh:.3f}}}", f"classification: {yaml_val(chunk.get('classification'))}", f"formatting: {yaml_val(chunk.get('formatting', []))}", f"cross_page_hint: {chunk.get('cross_page_hint', 'self_contained')}", f"prev_chunk: {yaml_val(chunk.get('prev_chunk'))}", f"next_chunk: {yaml_val(chunk.get('next_chunk'))}", f"related_image: {related_image}", f"related_table: {related_table}", f"ocr_confidence: {float(chunk.get('ocr_confidence') or 0.85):.2f}", f"ocr_source_lines: {yaml_val(chunk.get('ocr_source_lines', []))}", f"redaction_code: {yaml_val(chunk.get('redaction_code'))}", f"redaction_inferred_content_type: {yaml_val(chunk.get('redaction_inferred_content_type'))}", f"image_type: {yaml_val(chunk.get('image_type'))}", f"ufo_anomaly_detected: {yaml_val(chunk.get('ufo_anomaly_detected', False))}", f"cryptid_anomaly_detected: {yaml_val(chunk.get('cryptid_anomaly_detected', False))}", f"ufo_anomaly_type: {yaml_val(chunk.get('ufo_anomaly_type'))}", f"ufo_anomaly_rationale: {yaml_val(chunk.get('ufo_anomaly_rationale'))}", f"cryptid_anomaly_type: {yaml_val(chunk.get('cryptid_anomaly_type'))}", f"cryptid_anomaly_rationale: {yaml_val(chunk.get('cryptid_anomaly_rationale'))}", f"image_description_en: {yaml_val(chunk.get('image_description_en'))}", f"image_description_pt_br: {yaml_val(chunk.get('image_description_pt_br'))}", f"extracted_text: {yaml_val(chunk.get('extracted_text'))}", f"source_png: ../../processing/png/{DOC_ID}/{page_file}.png", "---", "", f"**EN:** {chunk.get('content_en') or ''}", "", f"**PT-BR:** {chunk.get('content_pt_br') or ''}", "", ] if ctype == "image": lines += [ f"![{chunk_id} image](../images/IMG-{chunk_id}.png)", "", ] if chunk.get("image_description_en"): lines += [f"*{chunk['image_description_en']}*", ""] (CHUNKS_DIR / f"{chunk_id}.md").write_text("\n".join(lines), encoding="utf-8") def write_index_json(all_chunks: list, build_at: str): index = { "doc_id": DOC_ID, "schema_version": "0.2.0", "total_pages": TOTAL_PAGES, "total_chunks": len(all_chunks), "build_approach": "subagents", "build_model": GEMINI_MODEL, "build_at": build_at, "chunks": [], } for chunk in all_chunks: cid = chunk["chunk_id"] content_en = chunk.get("content_en") or "" preview = content_en[:80] + ("..." if len(content_en) > 80 else "") bbox = chunk.get("bbox") or {"x": 0.0, "y": 0.0, "w": 1.0, "h": 1.0} if not isinstance(bbox, dict): bbox = {"x": 0.0, "y": 0.0, "w": 1.0, "h": 1.0} index["chunks"].append({ "chunk_id": cid, "type": chunk.get("type", "blank"), "page": chunk["page"], "order_in_page": chunk.get("order_in_page", 1), "order_global": chunk.get("order_global", 1), "file": f"chunks/{cid}.md", "bbox": bbox, "preview": preview, }) out = RAW_DIR / "_index.json" out.write_text(json.dumps(index, indent=2, ensure_ascii=False), encoding="utf-8") print(f" Written: {out}", flush=True) def write_document_md(all_chunks: list, build_at: str) -> int: type_hist: dict = {} ufo_flagged = [] cryptid_flagged = [] for chunk in all_chunks: t = chunk.get("type", "blank") type_hist[t] = type_hist.get(t, 0) + 1 if chunk.get("ufo_anomaly_detected"): ufo_flagged.append(chunk["chunk_id"]) if chunk.get("cryptid_anomaly_detected"): cryptid_flagged.append(chunk["chunk_id"]) hist = "\n".join(f" {k}: {v}" for k, v in sorted(type_hist.items())) header = ( "---\n" 'schema_version: "0.2.0"\n' "type: master_document\n" f"doc_id: {DOC_ID}\n" f'canonical_title: "{DOC_TITLE}"\n' f"total_pages: {TOTAL_PAGES}\n" f"total_chunks: {len(all_chunks)}\n" "chunk_types_histogram:\n" f"{hist}\n" "multi_page_tables: []\n" f"ufo_anomalies_flagged: [{', '.join(ufo_flagged)}]\n" f"cryptid_anomalies_flagged: [{', '.join(cryptid_flagged)}]\n" 'build_approach: "subagents"\n' f"build_model: {GEMINI_MODEL}\n" f"build_at: {build_at}\n" "---\n\n" f"# {DOC_TITLE}\n\n" ) pages_dict: dict = {} for chunk in all_chunks: p = chunk["page"] pages_dict.setdefault(p, []).append(chunk) body_parts = [] for page_num in sorted(pages_dict): body_parts.append(f"## Page {page_num}\n\n") for chunk in sorted(pages_dict[page_num], key=lambda c: c.get("order_in_page", 0)): cid = chunk["chunk_id"] ctype = chunk.get("type", "blank") bbox = chunk.get("bbox") or {"x": 0.0, "y": 0.0, "w": 1.0, "h": 1.0} if not isinstance(bbox, dict): bbox = {"x": 0.0, "y": 0.0, "w": 1.0, "h": 1.0} bs = ( f"{float(bbox.get('x', 0)):.2f}/" f"{float(bbox.get('y', 0)):.2f}/" f"{float(bbox.get('w', 1)):.2f}/" f"{float(bbox.get('h', 1)):.2f}" ) section = [ f"", f'', f"### Chunk {cid} — {ctype} · p{page_num} · bbox: {bs}", "", f"**EN:** {chunk.get('content_en') or ''}", "", f"**PT-BR:** {chunk.get('content_pt_br') or ''}", "", ] if ctype == "image": section += [f"![{cid} image](./images/IMG-{cid}.png)", ""] if chunk.get("image_description_en"): section += [f"*EN: {chunk['image_description_en']}*", ""] if chunk.get("image_description_pt_br"): section += [f"*PT-BR: {chunk['image_description_pt_br']}*", ""] meta = {k: v for k, v in chunk.items() if not k.startswith("_") and k not in ("content_en", "content_pt_br")} section += [ "
metadata", "", "```json", json.dumps(meta, indent=2, ensure_ascii=False), "```", "", "
", "", "---", "", ] body_parts.append("\n".join(section)) out = RAW_DIR / "document.md" out.write_text(header + "".join(body_parts), encoding="utf-8") size = out.stat().st_size print(f" Written: {out} ({size:,} bytes)", flush=True) return size CHECKPOINT_FILE = RAW_DIR / "_checkpoint_pages.json" def main(): t0 = time.time() build_at = datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ") print(f"=== Rebuilding {DOC_ID} ===", flush=True) print(f"Total pages: {TOTAL_PAGES} | Model: {GEMINI_MODEL} | max_output_tokens: {MAX_OUTPUT_TOKENS}", flush=True) print() # Step 1 — process pages (with checkpoint support) if CHECKPOINT_FILE.exists(): print("STEP 1: Loading from checkpoint...", flush=True) page_results = json.loads(CHECKPOINT_FILE.read_text(encoding="utf-8")) print(f" Loaded {len(page_results)} pages from checkpoint.", flush=True) else: print("STEP 1: Processing pages...", flush=True) page_results = process_pages_parallel(batch_size=BATCH_SIZE) CHECKPOINT_FILE.write_text(json.dumps(page_results, ensure_ascii=False), encoding="utf-8") print(f" Done. {len(page_results)} pages processed. Checkpoint saved.", flush=True) print() # Step 2 — assign chunk IDs print("STEP 2: Assigning chunk IDs...", flush=True) all_chunks = assign_global_chunk_ids(page_results) print(f" Total chunks: {len(all_chunks)}", flush=True) print() # Step 3 — crop images image_chunks = [c for c in all_chunks if c.get("type") == "image"] print(f"STEP 3: Cropping {len(image_chunks)} image chunks...", flush=True) for chunk in image_chunks: crop_image_chunk(chunk) print() # Step 4 — analyze images in batches print(f"STEP 4: Analyzing {len(image_chunks)} images...", flush=True) for b in range(0, len(image_chunks), BATCH_SIZE): batch = image_chunks[b:b + BATCH_SIZE] with concurrent.futures.ThreadPoolExecutor(max_workers=BATCH_SIZE) as ex: list(ex.map(analyze_image_chunk, batch)) if b + BATCH_SIZE < len(image_chunks): time.sleep(0.5) print() # Step 5 — write chunk files print("STEP 5: Writing chunk files...", flush=True) for chunk in all_chunks: write_chunk_file(chunk) print(f" Written {len(all_chunks)} chunk files.", flush=True) print() # Step 6 — write index print("STEP 6: Writing _index.json...", flush=True) write_index_json(all_chunks, build_at) print() # Step 7 — write document.md print("STEP 7: Writing document.md...", flush=True) doc_bytes = write_document_md(all_chunks, build_at) print() wall = int(time.time() - t0) num_images = len(image_chunks) num_ufo = len([c for c in all_chunks if c.get("ufo_anomaly_detected")]) num_cryptid = len([c for c in all_chunks if c.get("cryptid_anomaly_detected")]) print("=== DONE ===", flush=True) print( f"pages_done={TOTAL_PAGES}, chunks_total={len(all_chunks)}, " f"images_extracted={num_images}, tables_stitched=0, " f"ufo_anomalies={num_ufo}, cryptid_anomalies={num_cryptid}, " f"wall_seconds={wall}", flush=True, ) if __name__ == "__main__": main()