322 lines
11 KiB
Python
Executable file
322 lines
11 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
15-consolidate-tables.py — Multi-page table consolidation
|
|
|
|
For each document, walks pages in order and stitches together tables that
|
|
span multiple pages (detected by Haiku's `tables_detected[]` flags
|
|
`spans_multi_page`, `continues_from_prev_page`, `likely_continues_next_page`).
|
|
|
|
Output:
|
|
- wiki/tables/<TBL-<DOCSHORT>-<NNNN>>.md per consolidated multi-page table
|
|
- Updates each page.md's tables_detected[].table_id to reference the new TBL id
|
|
(so pages can cross-link to the master table.md)
|
|
|
|
Single-page tables stay inline in page.md (no separate table.md). Only spanning
|
|
tables get promoted.
|
|
|
|
Algorithm:
|
|
1. For each doc-id directory under wiki/pages/, sort pages by page_number.
|
|
2. Maintain a list of "open" tables (started, not yet ended).
|
|
3. For each page's tables in order:
|
|
a. If table has `continues_from_prev_page=true` and there's an open table
|
|
compatible with similar bbox/column-count, append this page as a span.
|
|
If `likely_continues_next_page=false`, finalize.
|
|
b. Otherwise start a new table. If `likely_continues_next_page=false` and
|
|
`spans_multi_page=false`, single-page → skip (don't promote).
|
|
Otherwise add to open tables.
|
|
4. Finalize each open table at end-of-doc.
|
|
5. Write wiki/tables/<id>.md and inject `table_id` back into each page's
|
|
tables_detected entry.
|
|
|
|
Idempotent: writes only when content changes.
|
|
|
|
Usage:
|
|
./15-consolidate-tables.py
|
|
./15-consolidate-tables.py --doc-id <id>
|
|
./15-consolidate-tables.py --force
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import re
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
try:
|
|
import yaml
|
|
except ImportError:
|
|
sys.stderr.write("Missing pyyaml. pip3 install pyyaml\n")
|
|
sys.exit(1)
|
|
|
|
|
|
UFO_ROOT = Path("/Users/guto/ufo")
|
|
PAGES_BASE = UFO_ROOT / "wiki" / "pages"
|
|
TABLES_BASE = UFO_ROOT / "wiki" / "tables"
|
|
LOG_PATH = UFO_ROOT / "wiki" / "log.md"
|
|
|
|
SCHEMA_VERSION = "0.1.0"
|
|
WIKI_VERSION = "0.1.0"
|
|
|
|
|
|
def utc_now_iso() -> str:
|
|
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
|
|
|
|
def read_md(path: Path) -> tuple[dict, str]:
|
|
c = path.read_text(encoding="utf-8")
|
|
if not c.startswith("---"):
|
|
return {}, c
|
|
end = c.find("---", 4)
|
|
if end == -1:
|
|
return {}, c
|
|
try:
|
|
return (yaml.safe_load(c[3:end].strip()) or {}), c[end + 3 :].lstrip("\n")
|
|
except yaml.YAMLError:
|
|
return {}, c[end + 3 :].lstrip("\n")
|
|
|
|
|
|
def write_md(path: Path, fm: dict, body: str) -> bool:
|
|
yaml_str = yaml.dump(fm, allow_unicode=True, sort_keys=False, default_flow_style=False)
|
|
new = f"---\n{yaml_str}---\n\n{body}" if not body.startswith("\n") else f"---\n{yaml_str}---\n{body}"
|
|
if path.exists() and path.read_text(encoding="utf-8") == new:
|
|
return False
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
path.write_text(new, encoding="utf-8")
|
|
return True
|
|
|
|
|
|
def doc_short_id(doc_id: str) -> str:
|
|
"""Compact uppercase identifier for use in TBL-<DOCSHORT>-NNNN."""
|
|
s = re.sub(r"[^A-Z0-9]", "", doc_id.upper())
|
|
# Drop common prefixes
|
|
for prefix in ("DOWUAP", "DOSUAP", "NASAUAP", "FBIPHOTO", "DOC"):
|
|
if s.startswith(prefix):
|
|
s = s[len(prefix):]
|
|
break
|
|
return s[:8] or "X"
|
|
|
|
|
|
def list_doc_dirs() -> list[Path]:
|
|
if not PAGES_BASE.exists():
|
|
return []
|
|
return sorted(d for d in PAGES_BASE.iterdir() if d.is_dir())
|
|
|
|
|
|
def consolidate_doc(doc_dir: Path, force: bool) -> int:
|
|
"""Walk pages in order, identify multi-page tables, write wiki/tables/<id>.md.
|
|
Returns number of tables created/updated."""
|
|
doc_id = doc_dir.name
|
|
pages = sorted(doc_dir.glob("p*.md"))
|
|
if not pages:
|
|
return 0
|
|
|
|
# Load all pages' tables_detected in order
|
|
pages_data = []
|
|
for p in pages:
|
|
fm, body = read_md(p)
|
|
if not fm:
|
|
continue
|
|
m = re.match(r"p(\d+)", p.stem)
|
|
if not m:
|
|
continue
|
|
page_num = int(m.group(1))
|
|
pages_data.append({
|
|
"path": p,
|
|
"page_num": page_num,
|
|
"page_id": fm.get("page_id", f"{doc_id}/p{page_num:03d}"),
|
|
"tables": fm.get("tables_detected") or [],
|
|
"fm": fm,
|
|
"body": body,
|
|
})
|
|
|
|
if not pages_data:
|
|
return 0
|
|
|
|
# Walk and stitch
|
|
open_tables = [] # list of dicts with `spans`, last bbox, last col count
|
|
finalized = [] # list of finalized tables ready to write
|
|
short = doc_short_id(doc_id)
|
|
|
|
def new_table():
|
|
return {
|
|
"spans": [], # [{page_num, page_id, bbox, role}]
|
|
"headers_summaries": [],
|
|
"row_count_estimates": [],
|
|
"col_count_estimates": [],
|
|
}
|
|
|
|
for page in pages_data:
|
|
if not page["tables"]:
|
|
# Close any open tables — they didn't continue to this page
|
|
for ot in open_tables:
|
|
if ot["spans"]:
|
|
ot["spans"][-1]["role"] = "end"
|
|
finalized.append(ot)
|
|
open_tables = []
|
|
continue
|
|
|
|
# Match each table on this page
|
|
matched_open = []
|
|
for t in page["tables"]:
|
|
bbox = t.get("bbox") or {}
|
|
continues_from = bool(t.get("continues_from_prev_page"))
|
|
likely_continues = bool(t.get("likely_continues_next_page"))
|
|
spans_multi = bool(t.get("spans_multi_page"))
|
|
|
|
tbl = None
|
|
if continues_from and open_tables:
|
|
# Continue the oldest open table (simple FIFO)
|
|
tbl = open_tables.pop(0)
|
|
elif spans_multi or likely_continues:
|
|
tbl = new_table()
|
|
else:
|
|
# Single-page table — skip, lives inline in page.md
|
|
continue
|
|
|
|
role = "start" if not tbl["spans"] else ("middle" if likely_continues else "end")
|
|
tbl["spans"].append({
|
|
"page_num": page["page_num"],
|
|
"page_id": page["page_id"],
|
|
"bbox": bbox,
|
|
"role": role,
|
|
})
|
|
if t.get("headers_summary"):
|
|
tbl["headers_summaries"].append(t["headers_summary"])
|
|
if t.get("row_count_estimate"):
|
|
tbl["row_count_estimates"].append(t["row_count_estimate"])
|
|
if t.get("col_count_estimate"):
|
|
tbl["col_count_estimates"].append(t["col_count_estimate"])
|
|
|
|
if likely_continues:
|
|
matched_open.append(tbl)
|
|
else:
|
|
finalized.append(tbl)
|
|
|
|
# Any open tables not matched on this page are stranded
|
|
for ot in open_tables:
|
|
if ot["spans"]:
|
|
ot["spans"][-1]["role"] = "end"
|
|
finalized.append(ot)
|
|
open_tables = matched_open
|
|
|
|
# Finalize any remaining open at end-of-doc
|
|
for ot in open_tables:
|
|
if ot["spans"]:
|
|
ot["spans"][-1]["role"] = "end"
|
|
finalized.append(ot)
|
|
|
|
# Filter to multi-page only (single-page slipped in via spans_multi_page=true on 1 page)
|
|
multi_page = [t for t in finalized if len(t["spans"]) >= 2]
|
|
if not multi_page:
|
|
return 0
|
|
|
|
# Write wiki/tables/<id>.md and update page.md back-refs
|
|
page_table_refs: dict[str, list[str]] = {} # page_id → [table_id...]
|
|
n_written = 0
|
|
for idx, tbl in enumerate(multi_page, start=1):
|
|
tbl_id = f"TBL-{short}-{idx:04d}"
|
|
# Pick best canonical headers (first non-empty)
|
|
headers = next((h for h in tbl["headers_summaries"] if h), "")
|
|
row_est = max(tbl["row_count_estimates"], default=0)
|
|
col_est = max(tbl["col_count_estimates"], default=0)
|
|
span_pages_yaml = []
|
|
for sp in tbl["spans"]:
|
|
span_pages_yaml.append({
|
|
"page": f"[[{sp['page_id']}]]",
|
|
"bbox": sp["bbox"],
|
|
"role": sp["role"],
|
|
})
|
|
page_table_refs.setdefault(sp["page_id"], []).append(tbl_id)
|
|
|
|
fm = {
|
|
"schema_version": SCHEMA_VERSION,
|
|
"type": "table",
|
|
"table_id": tbl_id,
|
|
"canonical_title": (headers or f"Multi-page table {idx} of {doc_id}")[:200],
|
|
"source_doc": f"[[{doc_id}]]",
|
|
"multi_page": True,
|
|
"page_count": len(tbl["spans"]),
|
|
"spans_pages": span_pages_yaml,
|
|
"headers_summary": headers,
|
|
"total_rows_estimate": row_est,
|
|
"total_columns_estimate": col_est,
|
|
"extraction_quality": None, # to be set when actually extracted to CSV
|
|
"last_ingest": utc_now_iso(),
|
|
"wiki_version": WIKI_VERSION,
|
|
}
|
|
body = (
|
|
f"# {fm['canonical_title']}\n\n"
|
|
f"> Multi-page table spanning {len(tbl['spans'])} pages of [[{doc_id}]]\n\n"
|
|
f"## Pages\n\n"
|
|
)
|
|
for sp in tbl["spans"]:
|
|
body += f"- {sp['role']}: [[{sp['page_id']}]] · bbox {sp['bbox']}\n"
|
|
body += "\n## Headers\n\n"
|
|
body += f"{headers or '_(not extracted)_'}\n\n"
|
|
body += "## Notes\n\n"
|
|
body += "Per-page table snippets live in each page.md's `tables_detected[]`. This consolidated record stitches them together. Full data extraction (row-by-row CSV) is deferred to a future enrichment pass.\n"
|
|
|
|
out = TABLES_BASE / f"{tbl_id}.md"
|
|
if write_md(out, fm, body):
|
|
n_written += 1
|
|
|
|
# Inject table_id back into each page.md (idempotent)
|
|
for page in pages_data:
|
|
refs = page_table_refs.get(page["page_id"])
|
|
if not refs:
|
|
continue
|
|
fm = page["fm"]
|
|
tables = fm.get("tables_detected") or []
|
|
if not tables:
|
|
continue
|
|
# Mark the first N matching tables with table_id (simple sequential mapping)
|
|
modified = False
|
|
ref_iter = iter(refs)
|
|
for t in tables:
|
|
if t.get("spans_multi_page") or t.get("continues_from_prev_page") or t.get("likely_continues_next_page"):
|
|
try:
|
|
next_id = next(ref_iter)
|
|
except StopIteration:
|
|
break
|
|
if t.get("table_id") != next_id:
|
|
t["table_id"] = next_id
|
|
modified = True
|
|
if modified:
|
|
fm["tables_detected"] = tables
|
|
write_md(page["path"], fm, page["body"])
|
|
|
|
return n_written
|
|
|
|
|
|
def main():
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("--doc-id", help="single doc")
|
|
ap.add_argument("--force", action="store_true")
|
|
args = ap.parse_args()
|
|
|
|
TABLES_BASE.mkdir(parents=True, exist_ok=True)
|
|
targets = [PAGES_BASE / args.doc_id] if args.doc_id else list_doc_dirs()
|
|
total_tables = 0
|
|
docs_with_tables = 0
|
|
for d in targets:
|
|
if not d.exists():
|
|
continue
|
|
n = consolidate_doc(d, args.force)
|
|
if n > 0:
|
|
print(f" ✓ {d.name}: {n} multi-page table(s)")
|
|
total_tables += n
|
|
docs_with_tables += 1
|
|
|
|
print(f"\nTotal: {total_tables} multi-page tables across {docs_with_tables} doc(s)")
|
|
if total_tables > 0:
|
|
with open(LOG_PATH, "a", encoding="utf-8") as fh:
|
|
fh.write(
|
|
f"\n## {utc_now_iso()} — CONSOLIDATE TABLES\n"
|
|
f"- operator: archivist\n- script: scripts/15-consolidate-tables.py\n"
|
|
f"- tables_written: {total_tables}\n- docs_with_tables: {docs_with_tables}\n"
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|