disclosure-bureau/web/lib/chat/openrouter.ts
Luiz Gustavo 55cac8a395
Some checks failed
CI / Web — typecheck + lint + build (push) Failing after 1m30s
CI / Scripts — Python smoke (push) Failing after 32s
CI / Web — npm audit (push) Failing after 37s
W0+W1+W1.2: security hardening, observability, autocomplete, glitchtip, forgejo CI
W0 — security hardening (5 fixes verified live on disclosure.top)
- middleware: gate /api/admin/* same as /admin/* (F1)
- imgproxy: tighten LOCAL_FILESYSTEM_ROOT from / to /var/lib/storage (F2)
- studio: real basic-auth label (bcrypt hash, middleware reference) (F3)
- relations: ENABLE ROW LEVEL SECURITY + public SELECT policy (F4)
- migration 0003: fold is_searchable + hybrid_search update into canonical (TD#2)

W1 — observability + resilience + autocomplete
- studio: HOSTNAME=0.0.0.0 so Next.js binds on loopback for healthcheck
- compose: PG_POOL_MAX=20, CLAUDE_CODE_OAUTH_TOKEN gated by separate env
- claude-code.ts: subprocess timeout configurable (CLAUDE_CODE_TIMEOUT_MS)
- openrouter.ts: retry with exponential backoff + Retry-After + in-memory
  circuit breaker (promotes FALLBACK after CB_THRESHOLD failures)
- lib/logger.ts: pino logger (NDJSON prod / pretty dev) + withRequest helper
- middleware: mints correlation_id, stamps x-correlation-id response header,
  emits structured http_request log per /api/* call
- messages/route.ts: switch to structured logger
- 60_meili_index.py: push documents + chunks into Meilisearch
- /api/search/autocomplete: parallel meili search (docs + chunks), 5-8ms p50
- search-autocomplete.tsx: debounced dropdown wired into search-panel

W1.2 — Glitchtip + Forgejo self-hosted
- compose: glitchtip-redis + glitchtip-web + glitchtip-worker (v4.2)
- compose: forgejo + forgejo-runner (server v9, runner v6) with group_add=988
- @sentry/nextjs SDK wired (instrumentation.ts + sentry.{client,server}.config.ts)
- /api/admin/throw smoke endpoint (gated by W0-F1 middleware)
- Synthetic event ingestion verified at glitchtip.disclosure.top
- forgejo.disclosure.top up, repo discadmin/disclosure-bureau created,
  runner registered (labels: ubuntu-latest, docker)
- .forgejo/workflows/ci.yml: typecheck + lint + build + npm audit + python
  syntax + compose validation

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-23 18:18:42 -03:00

522 lines
18 KiB
TypeScript

/**
* OpenRouter provider — OpenAI-compatible chat completions.
*
* Exports:
* sendOnce() — non-streaming, single call (no tools)
* streamWithTools() — streaming + tool-call loop (Pattern C)
*
* The tool-call loop runs locally:
* 1. Call model with messages + tools, stream=true.
* 2. Read SSE deltas: text → emit `text_delta`; tool_calls → buffer args by id.
* 3. When finish_reason='tool_calls': execute handlers locally, push results
* back as tool messages, call model again. Repeat until finish='stop'.
* 4. When done: emit `done`.
*
* Free models (May 2026):
* deepseek/deepseek-v4-flash:free — primary, solid tool calling
* nvidia/nemotron-3-super-120b-a12b:free — fallback
*/
import type { AGUIEvent } from "./agui";
import { TOOL_DEFINITIONS, TOOL_HANDLERS, type ToolHandlerContext } from "./tools";
const PRIMARY = process.env.OPENROUTER_MODEL || "deepseek/deepseek-v4-flash:free";
const FALLBACK = process.env.OPENROUTER_FALLBACK_MODEL || "nvidia/nemotron-3-super-120b-a12b:free";
const ENDPOINT = "https://openrouter.ai/api/v1/chat/completions";
// W1-TD#23: retry + circuit breaker for OpenRouter free-tier flakiness.
// Transient errors (429/502/503/504/network) are retried up to RETRY_MAX times
// with exponential backoff. Repeated PRIMARY failures within CB_WINDOW_MS
// trip an in-memory circuit breaker that promotes FALLBACK as the active
// model for CB_COOLDOWN_MS — protecting the chat from a single bad model.
const RETRY_MAX = Number(process.env.OPENROUTER_RETRY_MAX || 2);
const RETRY_BASE_MS = Number(process.env.OPENROUTER_RETRY_BASE_MS || 400);
const CB_WINDOW_MS = Number(process.env.OPENROUTER_CB_WINDOW_MS || 60_000);
const CB_THRESHOLD = Number(process.env.OPENROUTER_CB_THRESHOLD || 3);
const CB_COOLDOWN_MS = Number(process.env.OPENROUTER_CB_COOLDOWN_MS || 120_000);
const RETRYABLE_STATUSES = new Set([408, 425, 429, 500, 502, 503, 504]);
interface ModelBreaker { failures: number[]; openedAt: number | null }
const breakers = new Map<string, ModelBreaker>();
function breakerFor(model: string): ModelBreaker {
let b = breakers.get(model);
if (!b) { b = { failures: [], openedAt: null }; breakers.set(model, b); }
return b;
}
function isCircuitOpen(model: string): boolean {
const b = breakerFor(model);
if (!b.openedAt) return false;
if (Date.now() - b.openedAt > CB_COOLDOWN_MS) {
// Half-open: clear and let the next call probe the upstream.
b.openedAt = null; b.failures = [];
return false;
}
return true;
}
function recordFailure(model: string): void {
const b = breakerFor(model);
const now = Date.now();
b.failures = b.failures.filter((t) => now - t < CB_WINDOW_MS);
b.failures.push(now);
if (b.failures.length >= CB_THRESHOLD) b.openedAt = now;
}
function recordSuccess(model: string): void {
const b = breakerFor(model);
b.failures = []; b.openedAt = null;
}
/** Pick the active model honoring an open circuit on PRIMARY. */
function pickModel(preferred: string): string {
if (preferred === PRIMARY && isCircuitOpen(PRIMARY)) return FALLBACK;
return preferred;
}
/** Fetch wrapper with retry + breaker accounting. */
async function fetchOpenRouter(
body: Record<string, unknown>,
preferredModel: string,
): Promise<{ res: Response; model: string }> {
const model = pickModel(preferredModel);
body.model = model;
let lastErr: unknown;
for (let attempt = 0; attempt <= RETRY_MAX; attempt++) {
try {
const res = await fetch(ENDPOINT, {
method: "POST",
headers: headers(),
body: JSON.stringify(body),
});
if (res.ok) {
recordSuccess(model);
return { res, model };
}
if (!RETRYABLE_STATUSES.has(res.status)) {
const txt = await res.text();
const err = new Error(`openrouter HTTP ${res.status}: ${txt.slice(0, 300)}`);
if (res.status === 429 || res.status === 402) {
(err as Error & { isRateLimit?: boolean }).isRateLimit = true;
}
recordFailure(model);
throw err;
}
// Retryable — wait with exponential backoff, honor Retry-After if present.
const ra = Number(res.headers.get("retry-after"));
const waitMs = Number.isFinite(ra) && ra > 0
? ra * 1000
: RETRY_BASE_MS * Math.pow(2, attempt);
await new Promise((r) => setTimeout(r, waitMs));
lastErr = new Error(`openrouter HTTP ${res.status} (attempt ${attempt + 1}/${RETRY_MAX + 1})`);
} catch (e) {
// Network/abort — also retry up to RETRY_MAX.
lastErr = e;
if (attempt >= RETRY_MAX) break;
await new Promise((r) => setTimeout(r, RETRY_BASE_MS * Math.pow(2, attempt)));
}
}
recordFailure(model);
throw lastErr instanceof Error ? lastErr : new Error(String(lastErr));
}
type OAMsg =
| { role: "system" | "user"; content: string }
| { role: "assistant"; content?: string | null; tool_calls?: OAToolCall[] }
| { role: "tool"; content: string; tool_call_id: string; name?: string };
interface OAToolCall {
id: string;
type: "function";
function: { name: string; arguments: string };
}
interface OADelta {
role?: string;
content?: string;
tool_calls?: Array<{
index: number;
id?: string;
type?: string;
function?: { name?: string; arguments?: string };
}>;
}
interface OAStreamChunk {
choices?: Array<{
delta?: OADelta;
finish_reason?: string | null;
message?: { role: string; content?: string | null; tool_calls?: OAToolCall[] };
}>;
usage?: { prompt_tokens?: number; completion_tokens?: number; total_tokens?: number };
model?: string;
error?: { message?: string };
}
function headers() {
const apiKey = process.env.OPENROUTER_API_KEY;
if (!apiKey) throw new Error("OPENROUTER_API_KEY not set");
return {
"Authorization": `Bearer ${apiKey}`,
"Content-Type": "application/json",
"HTTP-Referer": process.env.NEXT_PUBLIC_SITE_URL || "https://disclosure.top",
"X-Title": "The Disclosure Bureau",
};
}
export interface SendOnceReq {
system: string;
messages: Array<{ role: "user" | "assistant" | "system"; content: string }>;
maxTokens?: number;
}
/** Non-streaming single shot — used by claude-code fallback path and tests. */
export async function sendOnce(req: SendOnceReq, preferredModel = PRIMARY): Promise<{
content: string;
model: string;
tokensIn?: number;
tokensOut?: number;
}> {
const body: Record<string, unknown> = {
messages: [
{ role: "system", content: req.system },
...req.messages.slice(-20),
],
max_tokens: req.maxTokens ?? 1024,
};
const { res, model } = await fetchOpenRouter(body, preferredModel);
const data = await res.json();
if (data.error) {
recordFailure(model);
throw new Error(`openrouter error: ${data.error.message}`);
}
recordSuccess(model);
return {
content: data.choices?.[0]?.message?.content ?? "",
model: data.model ?? model,
tokensIn: data.usage?.prompt_tokens,
tokensOut: data.usage?.completion_tokens,
};
}
/* ─── Pattern C: streaming + tool-call loop ─────────────────────────────── */
export interface StreamRequest {
system: string;
history: Array<{ role: "user" | "assistant"; content: string }>;
userTurn: string;
ctx: ToolHandlerContext;
maxTurns?: number; // max tool-call loop iterations
}
export interface StreamCallbacks {
emit: (ev: AGUIEvent) => void;
}
/**
* Stream with tool-call loop. Returns the final assistant content (assembled
* across deltas + after all tool calls resolved) plus token usage.
*
* The caller is responsible for closing the SSE stream after this resolves.
*/
export async function streamWithTools(
req: StreamRequest,
cb: StreamCallbacks,
): Promise<{
content: string;
model: string;
tokensIn: number;
tokensOut: number;
toolCalls: Array<{ name: string; args: Record<string, unknown>; result: unknown }>;
artifacts: import("./agui").Artifact[];
}> {
const maxTurns = req.maxTurns ?? 5;
// Collect artifacts silently — do NOT emit them to the SSE stream while
// tools are running. We only know which ones are "relevant" after the
// model has written its final prose, by matching [[doc-id/p007#cNNNN]]
// citations in the text. Whatever isn't cited gets dropped to keep the
// chat UI focused on the actual answer.
const collectedArtifacts: import("./agui").Artifact[] = [];
const originalEmit = cb.emit;
cb = {
emit: (ev) => {
if (ev.type === "artifact") {
collectedArtifacts.push(ev.artifact);
return; // suppressed — emitted later after filtering
}
originalEmit(ev);
},
};
const messages: OAMsg[] = [
{ role: "system", content: req.system },
...req.history.map((m): OAMsg => ({
role: m.role,
content: m.content,
})),
{ role: "user", content: req.userTurn },
];
let assembledText = "";
let totalIn = 0;
let totalOut = 0;
let modelUsed = PRIMARY;
const toolTrace: Array<{ name: string; args: Record<string, unknown>; result: unknown }> = [];
for (let turn = 0; turn < maxTurns; turn++) {
// Run one round.
let model = PRIMARY;
let res: Response;
try {
res = await openrouterStreamCall(messages, model);
} catch (e) {
if ((e as Error & { isRateLimit?: boolean }).isRateLimit) {
model = FALLBACK;
res = await openrouterStreamCall(messages, model);
} else throw e;
}
modelUsed = model;
if (!res.body) throw new Error("openrouter: no response body");
const { roundText, finishReason, toolCalls, usage } = await readSSE(res.body, cb);
assembledText += roundText;
totalIn += usage?.prompt_tokens ?? 0;
totalOut += usage?.completion_tokens ?? 0;
if (finishReason === "tool_calls" && toolCalls.length > 0) {
// Append the assistant's tool-call turn to message history
messages.push({
role: "assistant",
content: roundText || null,
tool_calls: toolCalls,
});
// Execute each tool locally
for (const tc of toolCalls) {
let argsObj: Record<string, unknown> = {};
try { argsObj = JSON.parse(tc.function.arguments || "{}"); }
catch { /* malformed — pass empty */ }
cb.emit({ type: "tool_start", id: tc.id, name: tc.function.name, args: argsObj });
const handler = TOOL_HANDLERS[tc.function.name];
const t0 = Date.now();
let result: unknown;
// Per-tool artifact sink: bridges typed artifacts produced by the
// handler into the SSE stream as `artifact` events.
const ctxWithSink: ToolHandlerContext = {
...req.ctx,
emitArtifact: (artifact) => cb.emit({ type: "artifact", artifact }),
};
try {
if (!handler) throw new Error(`unknown tool: ${tc.function.name}`);
result = await handler(argsObj, ctxWithSink);
} catch (e) {
result = { error: e instanceof Error ? e.message : String(e) };
}
const dt = Date.now() - t0;
cb.emit({ type: "tool_result", id: tc.id, result, durationMs: dt });
toolTrace.push({ name: tc.function.name, args: argsObj, result });
// Surface navigate_to as a UI event the frontend can render as a button
if (tc.function.name === "navigate_to") {
const target = String(argsObj.target ?? "");
const label = String(argsObj.label ?? target);
if (target.startsWith("/")) {
cb.emit({ type: "navigate", target, label });
}
}
// Append the tool result for the model
messages.push({
role: "tool",
tool_call_id: tc.id,
name: tc.function.name,
content: JSON.stringify(result).slice(0, 8000),
});
}
// Continue loop — model will see tool results and produce next turn
continue;
}
// No tool calls → done
break;
}
// Forced synthesis: free-tier models often exhaust the tool-call budget
// without ever producing prose, returning content_len=0. If the loop ended
// with empty text but the model did call tools, force ONE more turn without
// tools so the model must answer in plain text.
if (!assembledText.trim() && toolTrace.length > 0) {
cb.emit({
type: "text_delta",
delta: "",
});
messages.push({
role: "user",
content:
"Com base nas ferramentas que você acabou de chamar e nos resultados acima, " +
"responda agora ao usuário EM TEXTO (3-8 frases), em português brasileiro. " +
"Cite os chunks no formato [[doc-id/p007#c0042]]. Não chame mais nenhuma ferramenta.",
});
try {
const res = await openrouterStreamCall(messages, modelUsed, { withTools: false });
if (res.body) {
const final = await readSSE(res.body, cb);
assembledText += final.roundText;
totalIn += final.usage?.prompt_tokens ?? 0;
totalOut += final.usage?.completion_tokens ?? 0;
}
} catch (e) {
cb.emit({ type: "error", message: `synthesis failed: ${e instanceof Error ? e.message : String(e)}` });
}
}
// Filter artifacts to only those actually cited in the assistant's prose.
// Citation grammar: [[doc-id]], [[doc-id/p007]], [[doc-id#c0042]],
// [[doc-id/p007#c0042]]. The chunk anchor (cNNNN) is the strongest signal.
const citedChunks = new Set<string>();
const citedDocs = new Set<string>();
const wikiLinkRe = /\[\[([^\]|]+?)(?:\|[^\]]+)?\]\]/g;
let m: RegExpExecArray | null;
while ((m = wikiLinkRe.exec(assembledText)) !== null) {
const target = m[1].trim();
const chunkMatch = target.match(/(c\d{4})$/i);
if (chunkMatch) citedChunks.add(chunkMatch[1].toLowerCase());
// Doc id is whatever comes before the first `/` or `#`
const docMatch = target.match(/^([A-Za-z0-9._-]+)/);
if (docMatch) citedDocs.add(docMatch[1]);
}
const relevantArtifacts = collectedArtifacts.filter((a) => {
if (a.kind === "citation") {
return citedChunks.has(a.chunk_id.toLowerCase()) ||
(citedDocs.has(a.doc_id) && citedChunks.size === 0);
}
if (a.kind === "crop_image") {
if (a.chunk_id && citedChunks.has(a.chunk_id.toLowerCase())) return true;
return false;
}
// Entity / case / hypothesis / nav cards have no chunk reference — keep
// them all; they are rarely emitted by hybrid_search anyway.
return true;
});
// Now emit the filtered set so the UI renders only the relevant cards.
// De-dupe by kind+key so a citation and its crop_image both fire but no
// duplicate citations of the same chunk_id appear.
const seen = new Set<string>();
for (const a of relevantArtifacts) {
const key =
a.kind === "citation" ? `cit:${a.chunk_id}` :
a.kind === "crop_image" ? `crop:${a.chunk_id ?? a.src}` :
`${a.kind}:${JSON.stringify(a)}`;
if (seen.has(key)) continue;
seen.add(key);
originalEmit({ type: "artifact", artifact: a });
}
return {
content: assembledText,
model: modelUsed,
tokensIn: totalIn,
tokensOut: totalOut,
toolCalls: toolTrace,
artifacts: relevantArtifacts,
};
}
async function openrouterStreamCall(
messages: OAMsg[],
preferredModel: string,
opts: { withTools?: boolean } = {},
): Promise<Response> {
const withTools = opts.withTools !== false;
const body: Record<string, unknown> = {
messages,
stream: true,
max_tokens: 1024,
};
if (withTools) {
body.tools = TOOL_DEFINITIONS;
body.tool_choice = "auto";
}
const { res } = await fetchOpenRouter(body, preferredModel);
return res;
}
/**
* Read OpenRouter SSE stream. Emits text_delta events via cb.emit. Returns
* assembled text, finish_reason, accumulated tool_calls, and usage.
*/
async function readSSE(
body: ReadableStream<Uint8Array>,
cb: StreamCallbacks,
): Promise<{
roundText: string;
finishReason: string | null;
toolCalls: OAToolCall[];
usage?: { prompt_tokens?: number; completion_tokens?: number };
}> {
const reader = body.getReader();
const decoder = new TextDecoder();
let buffer = "";
let roundText = "";
let finishReason: string | null = null;
// tool_calls arrive as deltas across many chunks; accumulate by index.
const toolBufs: Record<number, { id: string; name: string; args: string }> = {};
let usage: { prompt_tokens?: number; completion_tokens?: number } | undefined;
for (;;) {
const { value, done } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
// SSE messages are separated by \n\n
let idx: number;
while ((idx = buffer.indexOf("\n\n")) !== -1) {
const raw = buffer.slice(0, idx);
buffer = buffer.slice(idx + 2);
// Each message may have multiple lines; we care about "data: ..."
for (const line of raw.split("\n")) {
if (!line.startsWith("data: ")) continue;
const payload = line.slice(6).trim();
if (payload === "[DONE]") {
return { roundText, finishReason, toolCalls: collectToolCalls(toolBufs), usage };
}
let chunk: OAStreamChunk;
try { chunk = JSON.parse(payload); }
catch { continue; }
if (chunk.error) throw new Error(`openrouter stream error: ${chunk.error.message}`);
if (chunk.usage) usage = chunk.usage;
const choice = chunk.choices?.[0];
if (!choice) continue;
const d = choice.delta ?? {};
if (typeof d.content === "string" && d.content) {
roundText += d.content;
cb.emit({ type: "text_delta", delta: d.content });
}
if (Array.isArray(d.tool_calls)) {
for (const tc of d.tool_calls) {
const slot = (toolBufs[tc.index] ??= { id: "", name: "", args: "" });
if (tc.id) slot.id = tc.id;
if (tc.function?.name) slot.name = tc.function.name;
if (tc.function?.arguments) slot.args += tc.function.arguments;
}
}
if (choice.finish_reason) finishReason = choice.finish_reason;
}
}
}
return { roundText, finishReason, toolCalls: collectToolCalls(toolBufs), usage };
}
function collectToolCalls(bufs: Record<number, { id: string; name: string; args: string }>): OAToolCall[] {
return Object.values(bufs)
.filter((b) => b.id && b.name)
.map((b) => ({
id: b.id,
type: "function" as const,
function: { name: b.name, arguments: b.args || "{}" },
}));
}