/** * OpenRouter provider — OpenAI-compatible chat completions. * * Exports: * sendOnce() — non-streaming, single call (no tools) * streamWithTools() — streaming + tool-call loop (Pattern C) * * The tool-call loop runs locally: * 1. Call model with messages + tools, stream=true. * 2. Read SSE deltas: text → emit `text_delta`; tool_calls → buffer args by id. * 3. When finish_reason='tool_calls': execute handlers locally, push results * back as tool messages, call model again. Repeat until finish='stop'. * 4. When done: emit `done`. * * Free models (May 2026): * deepseek/deepseek-v4-flash:free — primary, solid tool calling * nvidia/nemotron-3-super-120b-a12b:free — fallback */ import type { AGUIEvent } from "./agui"; import { TOOL_DEFINITIONS, TOOL_HANDLERS, type ToolHandlerContext } from "./tools"; const PRIMARY = process.env.OPENROUTER_MODEL || "deepseek/deepseek-v4-flash:free"; const FALLBACK = process.env.OPENROUTER_FALLBACK_MODEL || "nvidia/nemotron-3-super-120b-a12b:free"; const ENDPOINT = "https://openrouter.ai/api/v1/chat/completions"; type OAMsg = | { role: "system" | "user"; content: string } | { role: "assistant"; content?: string | null; tool_calls?: OAToolCall[] } | { role: "tool"; content: string; tool_call_id: string; name?: string }; interface OAToolCall { id: string; type: "function"; function: { name: string; arguments: string }; } interface OADelta { role?: string; content?: string; tool_calls?: Array<{ index: number; id?: string; type?: string; function?: { name?: string; arguments?: string }; }>; } interface OAStreamChunk { choices?: Array<{ delta?: OADelta; finish_reason?: string | null; message?: { role: string; content?: string | null; tool_calls?: OAToolCall[] }; }>; usage?: { prompt_tokens?: number; completion_tokens?: number; total_tokens?: number }; model?: string; error?: { message?: string }; } function headers() { const apiKey = process.env.OPENROUTER_API_KEY; if (!apiKey) throw new Error("OPENROUTER_API_KEY not set"); return { "Authorization": `Bearer ${apiKey}`, "Content-Type": "application/json", "HTTP-Referer": process.env.NEXT_PUBLIC_SITE_URL || "https://disclosure.top", "X-Title": "The Disclosure Bureau", }; } export interface SendOnceReq { system: string; messages: Array<{ role: "user" | "assistant" | "system"; content: string }>; maxTokens?: number; } /** Non-streaming single shot — used by claude-code fallback path and tests. */ export async function sendOnce(req: SendOnceReq, model = PRIMARY): Promise<{ content: string; model: string; tokensIn?: number; tokensOut?: number; }> { const body = { model, messages: [ { role: "system", content: req.system }, ...req.messages.slice(-20), ], max_tokens: req.maxTokens ?? 1024, }; const res = await fetch(ENDPOINT, { method: "POST", headers: headers(), body: JSON.stringify(body), }); if (!res.ok) { const txt = await res.text(); const err = new Error(`openrouter HTTP ${res.status}: ${txt.slice(0, 300)}`); if (res.status === 429 || res.status === 402) { (err as Error & { isRateLimit?: boolean }).isRateLimit = true; } throw err; } const data = await res.json(); if (data.error) throw new Error(`openrouter error: ${data.error.message}`); return { content: data.choices?.[0]?.message?.content ?? "", model: data.model ?? model, tokensIn: data.usage?.prompt_tokens, tokensOut: data.usage?.completion_tokens, }; } /* ─── Pattern C: streaming + tool-call loop ─────────────────────────────── */ export interface StreamRequest { system: string; history: Array<{ role: "user" | "assistant"; content: string }>; userTurn: string; ctx: ToolHandlerContext; maxTurns?: number; // max tool-call loop iterations } export interface StreamCallbacks { emit: (ev: AGUIEvent) => void; } /** * Stream with tool-call loop. Returns the final assistant content (assembled * across deltas + after all tool calls resolved) plus token usage. * * The caller is responsible for closing the SSE stream after this resolves. */ export async function streamWithTools( req: StreamRequest, cb: StreamCallbacks, ): Promise<{ content: string; model: string; tokensIn: number; tokensOut: number; toolCalls: Array<{ name: string; args: Record; result: unknown }>; artifacts: import("./agui").Artifact[]; }> { const maxTurns = req.maxTurns ?? 5; // Collect artifacts silently — do NOT emit them to the SSE stream while // tools are running. We only know which ones are "relevant" after the // model has written its final prose, by matching [[doc-id/p007#cNNNN]] // citations in the text. Whatever isn't cited gets dropped to keep the // chat UI focused on the actual answer. const collectedArtifacts: import("./agui").Artifact[] = []; const originalEmit = cb.emit; cb = { emit: (ev) => { if (ev.type === "artifact") { collectedArtifacts.push(ev.artifact); return; // suppressed — emitted later after filtering } originalEmit(ev); }, }; const messages: OAMsg[] = [ { role: "system", content: req.system }, ...req.history.map((m): OAMsg => ({ role: m.role, content: m.content, })), { role: "user", content: req.userTurn }, ]; let assembledText = ""; let totalIn = 0; let totalOut = 0; let modelUsed = PRIMARY; const toolTrace: Array<{ name: string; args: Record; result: unknown }> = []; for (let turn = 0; turn < maxTurns; turn++) { // Run one round. let model = PRIMARY; let res: Response; try { res = await openrouterStreamCall(messages, model); } catch (e) { if ((e as Error & { isRateLimit?: boolean }).isRateLimit) { model = FALLBACK; res = await openrouterStreamCall(messages, model); } else throw e; } modelUsed = model; if (!res.body) throw new Error("openrouter: no response body"); const { roundText, finishReason, toolCalls, usage } = await readSSE(res.body, cb); assembledText += roundText; totalIn += usage?.prompt_tokens ?? 0; totalOut += usage?.completion_tokens ?? 0; if (finishReason === "tool_calls" && toolCalls.length > 0) { // Append the assistant's tool-call turn to message history messages.push({ role: "assistant", content: roundText || null, tool_calls: toolCalls, }); // Execute each tool locally for (const tc of toolCalls) { let argsObj: Record = {}; try { argsObj = JSON.parse(tc.function.arguments || "{}"); } catch { /* malformed — pass empty */ } cb.emit({ type: "tool_start", id: tc.id, name: tc.function.name, args: argsObj }); const handler = TOOL_HANDLERS[tc.function.name]; const t0 = Date.now(); let result: unknown; // Per-tool artifact sink: bridges typed artifacts produced by the // handler into the SSE stream as `artifact` events. const ctxWithSink: ToolHandlerContext = { ...req.ctx, emitArtifact: (artifact) => cb.emit({ type: "artifact", artifact }), }; try { if (!handler) throw new Error(`unknown tool: ${tc.function.name}`); result = await handler(argsObj, ctxWithSink); } catch (e) { result = { error: e instanceof Error ? e.message : String(e) }; } const dt = Date.now() - t0; cb.emit({ type: "tool_result", id: tc.id, result, durationMs: dt }); toolTrace.push({ name: tc.function.name, args: argsObj, result }); // Surface navigate_to as a UI event the frontend can render as a button if (tc.function.name === "navigate_to") { const target = String(argsObj.target ?? ""); const label = String(argsObj.label ?? target); if (target.startsWith("/")) { cb.emit({ type: "navigate", target, label }); } } // Append the tool result for the model messages.push({ role: "tool", tool_call_id: tc.id, name: tc.function.name, content: JSON.stringify(result).slice(0, 8000), }); } // Continue loop — model will see tool results and produce next turn continue; } // No tool calls → done break; } // Forced synthesis: free-tier models often exhaust the tool-call budget // without ever producing prose, returning content_len=0. If the loop ended // with empty text but the model did call tools, force ONE more turn without // tools so the model must answer in plain text. if (!assembledText.trim() && toolTrace.length > 0) { cb.emit({ type: "text_delta", delta: "", }); messages.push({ role: "user", content: "Com base nas ferramentas que você acabou de chamar e nos resultados acima, " + "responda agora ao usuário EM TEXTO (3-8 frases), em português brasileiro. " + "Cite os chunks no formato [[doc-id/p007#c0042]]. Não chame mais nenhuma ferramenta.", }); try { const res = await openrouterStreamCall(messages, modelUsed, { withTools: false }); if (res.body) { const final = await readSSE(res.body, cb); assembledText += final.roundText; totalIn += final.usage?.prompt_tokens ?? 0; totalOut += final.usage?.completion_tokens ?? 0; } } catch (e) { cb.emit({ type: "error", message: `synthesis failed: ${e instanceof Error ? e.message : String(e)}` }); } } // Filter artifacts to only those actually cited in the assistant's prose. // Citation grammar: [[doc-id]], [[doc-id/p007]], [[doc-id#c0042]], // [[doc-id/p007#c0042]]. The chunk anchor (cNNNN) is the strongest signal. const citedChunks = new Set(); const citedDocs = new Set(); const wikiLinkRe = /\[\[([^\]|]+?)(?:\|[^\]]+)?\]\]/g; let m: RegExpExecArray | null; while ((m = wikiLinkRe.exec(assembledText)) !== null) { const target = m[1].trim(); const chunkMatch = target.match(/(c\d{4})$/i); if (chunkMatch) citedChunks.add(chunkMatch[1].toLowerCase()); // Doc id is whatever comes before the first `/` or `#` const docMatch = target.match(/^([A-Za-z0-9._-]+)/); if (docMatch) citedDocs.add(docMatch[1]); } const relevantArtifacts = collectedArtifacts.filter((a) => { if (a.kind === "citation") { return citedChunks.has(a.chunk_id.toLowerCase()) || (citedDocs.has(a.doc_id) && citedChunks.size === 0); } if (a.kind === "crop_image") { if (a.chunk_id && citedChunks.has(a.chunk_id.toLowerCase())) return true; return false; } // Entity / case / hypothesis / nav cards have no chunk reference — keep // them all; they are rarely emitted by hybrid_search anyway. return true; }); // Now emit the filtered set so the UI renders only the relevant cards. // De-dupe by kind+key so a citation and its crop_image both fire but no // duplicate citations of the same chunk_id appear. const seen = new Set(); for (const a of relevantArtifacts) { const key = a.kind === "citation" ? `cit:${a.chunk_id}` : a.kind === "crop_image" ? `crop:${a.chunk_id ?? a.src}` : `${a.kind}:${JSON.stringify(a)}`; if (seen.has(key)) continue; seen.add(key); originalEmit({ type: "artifact", artifact: a }); } return { content: assembledText, model: modelUsed, tokensIn: totalIn, tokensOut: totalOut, toolCalls: toolTrace, artifacts: relevantArtifacts, }; } async function openrouterStreamCall( messages: OAMsg[], model: string, opts: { withTools?: boolean } = {}, ): Promise { const withTools = opts.withTools !== false; const body: Record = { model, messages, stream: true, max_tokens: 1024, }; if (withTools) { body.tools = TOOL_DEFINITIONS; body.tool_choice = "auto"; } const res = await fetch(ENDPOINT, { method: "POST", headers: headers(), body: JSON.stringify(body), }); if (!res.ok) { const txt = await res.text(); const err = new Error(`openrouter HTTP ${res.status}: ${txt.slice(0, 300)}`); if (res.status === 429 || res.status === 402) { (err as Error & { isRateLimit?: boolean }).isRateLimit = true; } throw err; } return res; } /** * Read OpenRouter SSE stream. Emits text_delta events via cb.emit. Returns * assembled text, finish_reason, accumulated tool_calls, and usage. */ async function readSSE( body: ReadableStream, cb: StreamCallbacks, ): Promise<{ roundText: string; finishReason: string | null; toolCalls: OAToolCall[]; usage?: { prompt_tokens?: number; completion_tokens?: number }; }> { const reader = body.getReader(); const decoder = new TextDecoder(); let buffer = ""; let roundText = ""; let finishReason: string | null = null; // tool_calls arrive as deltas across many chunks; accumulate by index. const toolBufs: Record = {}; let usage: { prompt_tokens?: number; completion_tokens?: number } | undefined; for (;;) { const { value, done } = await reader.read(); if (done) break; buffer += decoder.decode(value, { stream: true }); // SSE messages are separated by \n\n let idx: number; while ((idx = buffer.indexOf("\n\n")) !== -1) { const raw = buffer.slice(0, idx); buffer = buffer.slice(idx + 2); // Each message may have multiple lines; we care about "data: ..." for (const line of raw.split("\n")) { if (!line.startsWith("data: ")) continue; const payload = line.slice(6).trim(); if (payload === "[DONE]") { return { roundText, finishReason, toolCalls: collectToolCalls(toolBufs), usage }; } let chunk: OAStreamChunk; try { chunk = JSON.parse(payload); } catch { continue; } if (chunk.error) throw new Error(`openrouter stream error: ${chunk.error.message}`); if (chunk.usage) usage = chunk.usage; const choice = chunk.choices?.[0]; if (!choice) continue; const d = choice.delta ?? {}; if (typeof d.content === "string" && d.content) { roundText += d.content; cb.emit({ type: "text_delta", delta: d.content }); } if (Array.isArray(d.tool_calls)) { for (const tc of d.tool_calls) { const slot = (toolBufs[tc.index] ??= { id: "", name: "", args: "" }); if (tc.id) slot.id = tc.id; if (tc.function?.name) slot.name = tc.function.name; if (tc.function?.arguments) slot.args += tc.function.arguments; } } if (choice.finish_reason) finishReason = choice.finish_reason; } } } return { roundText, finishReason, toolCalls: collectToolCalls(toolBufs), usage }; } function collectToolCalls(bufs: Record): OAToolCall[] { return Object.values(bufs) .filter((b) => b.id && b.name) .map((b) => ({ id: b.id, type: "function" as const, function: { name: b.name, arguments: b.args || "{}" }, })); }