From 0a5827c0b68ab85254ddec8b25254f784f07a017 Mon Sep 17 00:00:00 2001 From: umairinayat Date: Wed, 1 Apr 2026 12:33:06 +0500 Subject: [PATCH] fix(openai-shim): preserve final streaming usage chunks Handle OpenAI-compatible SSE responses that send usage in a trailing empty-choices chunk so token accounting and budget enforcement stay correct. --- src/services/api/openaiShim.test.ts | 135 ++++++++++++++++++++++++++++ src/services/api/openaiShim.ts | 38 +++++++- 2 files changed, 170 insertions(+), 3 deletions(-) create mode 100644 src/services/api/openaiShim.test.ts diff --git a/src/services/api/openaiShim.test.ts b/src/services/api/openaiShim.test.ts new file mode 100644 index 00000000..34f81678 --- /dev/null +++ b/src/services/api/openaiShim.test.ts @@ -0,0 +1,135 @@ +import { afterEach, beforeEach, expect, test } from 'bun:test' +import { createOpenAIShimClient } from './openaiShim.ts' + +type FetchType = typeof globalThis.fetch + +const originalEnv = { + OPENAI_BASE_URL: process.env.OPENAI_BASE_URL, + OPENAI_API_KEY: process.env.OPENAI_API_KEY, +} + +const originalFetch = globalThis.fetch + +function makeSseResponse(lines: string[]): Response { + const encoder = new TextEncoder() + return new Response( + new ReadableStream({ + start(controller) { + for (const line of lines) { + controller.enqueue(encoder.encode(line)) + } + controller.close() + }, + }), + { + headers: { + 'Content-Type': 'text/event-stream', + }, + }, + ) +} + +function makeStreamChunks(chunks: unknown[]): string[] { + return [ + ...chunks.map(chunk => `data: ${JSON.stringify(chunk)}\n\n`), + 'data: [DONE]\n\n', + ] +} + +beforeEach(() => { + process.env.OPENAI_BASE_URL = 'http://example.test/v1' + process.env.OPENAI_API_KEY = 'test-key' +}) + +afterEach(() => { + process.env.OPENAI_BASE_URL = originalEnv.OPENAI_BASE_URL + process.env.OPENAI_API_KEY = originalEnv.OPENAI_API_KEY + globalThis.fetch = originalFetch +}) + +test('preserves usage from final OpenAI stream chunk with empty choices', async () => { + globalThis.fetch = (async (_input, init) => { + const url = typeof _input === 'string' ? _input : _input.url + expect(url).toBe('http://example.test/v1/chat/completions') + + const body = JSON.parse(String(init?.body)) + expect(body.stream).toBe(true) + expect(body.stream_options).toEqual({ include_usage: true }) + + const chunks = makeStreamChunks([ + { + id: 'chatcmpl-1', + object: 'chat.completion.chunk', + model: 'fake-model', + choices: [ + { + index: 0, + delta: { role: 'assistant', content: 'hello world' }, + finish_reason: null, + }, + ], + }, + { + id: 'chatcmpl-1', + object: 'chat.completion.chunk', + model: 'fake-model', + choices: [ + { + index: 0, + delta: {}, + finish_reason: 'stop', + }, + ], + }, + { + id: 'chatcmpl-1', + object: 'chat.completion.chunk', + model: 'fake-model', + choices: [], + usage: { + prompt_tokens: 123, + completion_tokens: 45, + total_tokens: 168, + }, + }, + ]) + + return makeSseResponse(chunks) + }) as FetchType + + const client = createOpenAIShimClient({}) as { + beta: { + messages: { + create: ( + params: Record, + options?: Record, + ) => Promise & { + withResponse: () => Promise<{ data: AsyncIterable> }> + } + } + } + } + + const result = await client.beta.messages + .create({ + model: 'fake-model', + system: 'test system', + messages: [{ role: 'user', content: 'hello' }], + max_tokens: 64, + stream: true, + }) + .withResponse() + + const events: Array> = [] + for await (const event of result.data) { + events.push(event) + } + + const usageEvent = events.find( + event => event.type === 'message_delta' && typeof event.usage === 'object' && event.usage !== null, + ) as { usage?: { input_tokens?: number; output_tokens?: number } } | undefined + + expect(usageEvent).toBeDefined() + expect(usageEvent?.usage?.input_tokens).toBe(123) + expect(usageEvent?.usage?.output_tokens).toBe(45) +}) diff --git a/src/services/api/openaiShim.ts b/src/services/api/openaiShim.ts index c3ca3393..b737b04a 100644 --- a/src/services/api/openaiShim.ts +++ b/src/services/api/openaiShim.ts @@ -267,6 +267,19 @@ function makeMessageId(): string { return `msg_${Math.random().toString(36).slice(2)}${Date.now().toString(36)}` } +function convertChunkUsage( + usage: OpenAIStreamChunk['usage'] | undefined, +): Partial | undefined { + if (!usage) return undefined + + return { + input_tokens: usage.prompt_tokens ?? 0, + output_tokens: usage.completion_tokens ?? 0, + cache_creation_input_tokens: 0, + cache_read_input_tokens: 0, + } +} + /** * Async generator that transforms an OpenAI SSE stream into * Anthropic-format BetaRawMessageStreamEvent objects. @@ -279,6 +292,8 @@ async function* openaiStreamToAnthropic( let contentBlockIndex = 0 const activeToolCalls = new Map() let hasEmittedContentStart = false + let lastStopReason: 'tool_use' | 'max_tokens' | 'end_turn' | null = null + let hasEmittedFinalUsage = false // Emit message_start yield { @@ -326,6 +341,8 @@ async function* openaiStreamToAnthropic( continue } + const chunkUsage = convertChunkUsage(chunk.usage) + for (const choice of chunk.choices ?? []) { const delta = choice.delta @@ -427,16 +444,31 @@ async function* openaiStreamToAnthropic( : choice.finish_reason === 'length' ? 'max_tokens' : 'end_turn' + lastStopReason = stopReason yield { type: 'message_delta', delta: { stop_reason: stopReason, stop_sequence: null }, - usage: { - output_tokens: chunk.usage?.completion_tokens ?? 0, - }, + ...(chunkUsage ? { usage: chunkUsage } : {}), + } + if (chunkUsage) { + hasEmittedFinalUsage = true } } } + + if ( + !hasEmittedFinalUsage && + chunkUsage && + (chunk.choices?.length ?? 0) === 0 + ) { + yield { + type: 'message_delta', + delta: { stop_reason: lastStopReason, stop_sequence: null }, + usage: chunkUsage, + } + hasEmittedFinalUsage = true + } } }