disclosure-bureau/investigator-runtime/src/orchestrator.ts

113 lines
4.1 KiB
TypeScript
Raw Normal View History

W3.1-W3.4: Investigation Bureau foundation — migrations, runtime, Locard Migrations: - 0004_investigation_bureau.sql: 7 new tables (investigation_jobs + evidence, hypotheses, contradictions, witnesses, gaps, residual_uncertainties), id sequences, pg_notify trigger on investigation_jobs, RLS read-only public, investigator role with least-privilege grants (no service_role). - 0005_investigator_write_policies.sql: fixup adding RLS INSERT/UPDATE policies bound to investigator + service_role + postgres (RLS with only a SELECT policy was silently blocking the worker's claim UPDATE). investigator-runtime/ (new Bun + TS container): - src/main.ts: LISTEN/NOTIFY poller, claim-with-SKIP-LOCKED, drain pool, healthcheck file, graceful SIGTERM shutdown. - src/orchestrator.ts: chief-detective dispatch (evidence_chain → Locard). Marks job failed when all per-item outputs error; surfaces first errors. - src/lib/{env,pg,audit,ids,claude}.ts: typed config (gate #8), pool + dedicated LISTEN client, NDJSON audit, sequence allocator (E-NNNN etc), claude -p subprocess with quota detection (api_error_status=429). - src/tools/write_evidence.ts: schema-validate (grade A/B/C custody steps), resolve chunk_pk via FK, verify verbatim_excerpt actually appears in chunk content, INSERT + render case/evidence/E-NNNN.md + audit. - src/detectives/locard.ts: load chunk → call Claude with locard.md system prompt → parse strict JSON → call writeEvidence locally. - Dockerfile installs `claude` CLI (OAuth) at build time. Compose: - new `investigator` service builds from investigator-runtime/, connects with low-privilege role, mounts case/ RW and wiki/+raw/ RO, 512m mem cap. Web: - /api/admin/investigate/test (POST+GET) gated by middleware (W0-F1). POST creates a job, GET polls status. For W3.6 it becomes the chat tool. End-to-end smoke: INSERT job → pg_notify → claim → Locard dispatch → claude subprocess invoked. Auth works (CLI v2.1.150). Currently quota exhausted (weekly limit · resets 3pm UTC) — pipeline catches the typed isQuota error, marks job failed with surfaced reason. Architecture proven; quota reset enables real evidence creation. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-23 22:49:33 +00:00
/**
* orchestrator.ts chief-detective. Decides which detective runs for a job.
*
* For W3.1W3.4 we only know `evidence_chain` (Locard). Other kinds enter the
* registry as we build each detective in W3.5+. Unknown kinds fail the job
* loudly so we don't quietly drop work.
*/
import { audit } from "./lib/audit";
import { query } from "./lib/pg";
import { runLocard, type LocardTask } from "./detectives/locard";
export interface InvestigationJob {
job_id: string;
kind: string;
payload: Record<string, unknown>;
triggered_by: string | null;
}
export async function dispatch(job: InvestigationJob, workerId: string): Promise<void> {
await audit({ event: "job_claimed", job_id: job.job_id, kind: job.kind, worker_id: workerId });
let outputs: unknown[] = [];
try {
switch (job.kind) {
case "evidence_chain": {
// Payload shape: { doc_id, chunks?: [chunk_ids] } — fall back to scanning
// the first 20 substantive chunks of the doc if not provided.
const docId = String(job.payload.doc_id ?? "");
if (!docId) throw new Error("evidence_chain requires payload.doc_id");
const chunkIds = Array.isArray(job.payload.chunks)
? (job.payload.chunks as string[])
: await pickEvidenceCandidates(docId, 5);
if (chunkIds.length === 0) throw new Error(`no candidate chunks in ${docId}`);
for (const chunk_id of chunkIds) {
const task: LocardTask = {
job_id: job.job_id,
doc_id: docId,
chunk_id,
claim: typeof job.payload.claim === "string" ? job.payload.claim : undefined,
};
try {
const r = await runLocard(task);
outputs.push({ chunk_id, ...r });
} catch (e) {
outputs.push({ chunk_id, error: (e as Error).message });
}
}
break;
}
default:
throw new Error(`unknown_kind: ${job.kind}`);
}
// Status reflects reality: if every per-item attempt errored we mark
// the job failed (so the UI doesn't say "complete" when nothing useful
// was produced); if at least one succeeded we keep `complete` with the
// mixed outputs payload.
const allErrors = outputs.length > 0 && outputs.every(
(o): o is { error: string } => typeof (o as { error?: unknown }).error === "string",
);
const summary = (() => {
if (!allErrors) return null;
// First few error messages, surfaced to the user via the jobs table.
return outputs
.map((o) => (o as { error?: string }).error)
.filter((e): e is string => Boolean(e))
.slice(0, 3)
.join(" | ");
})();
await query(
`UPDATE public.investigation_jobs
SET status = $1, finished_at = NOW(), outputs = $2::jsonb, error = $3
WHERE job_id = $4`,
[allErrors ? "failed" : "complete", JSON.stringify(outputs), summary, job.job_id],
);
await audit({
event: allErrors ? "job_failed_all_items" : "job_completed",
job_id: job.job_id,
kind: job.kind,
n_outputs: outputs.length,
...(summary ? { summary } : {}),
});
} catch (e) {
const err = (e as Error).message;
await query(
`UPDATE public.investigation_jobs
SET status = 'failed', finished_at = NOW(), error = $1, outputs = $2::jsonb
WHERE job_id = $3`,
[err, JSON.stringify(outputs), job.job_id],
);
await audit({ event: "job_failed", job_id: job.job_id, kind: job.kind, error: err });
}
}
/**
* Pick a small set of chunks that are likely to yield evidence body
* paragraphs, longer than 200 chars, marked `is_searchable`. Ordered by
* Sonnet's anomaly flag first so we extract the most interesting first.
*/
async function pickEvidenceCandidates(doc_id: string, limit: number): Promise<string[]> {
const rows = await query<{ chunk_id: string }>(
`SELECT chunk_id
FROM public.chunks
WHERE doc_id = $1
AND is_searchable
AND LENGTH(COALESCE(content_en, content_pt, '')) > 200
ORDER BY ufo_anomaly DESC, page ASC, order_in_page ASC
LIMIT $2`,
[doc_id, limit],
);
return rows.map((r) => r.chunk_id);
}