fix(openai-shim): preserve final streaming usage chunks

Handle OpenAI-compatible SSE responses that send usage in a trailing empty-choices chunk so token accounting and budget enforcement stay correct.
This commit is contained in:
umairinayat
2026-04-01 12:33:06 +05:00
parent 2d7aa9c841
commit 0a5827c0b6
2 changed files with 170 additions and 3 deletions

View File

@@ -0,0 +1,135 @@
import { afterEach, beforeEach, expect, test } from 'bun:test'
import { createOpenAIShimClient } from './openaiShim.ts'
type FetchType = typeof globalThis.fetch
const originalEnv = {
OPENAI_BASE_URL: process.env.OPENAI_BASE_URL,
OPENAI_API_KEY: process.env.OPENAI_API_KEY,
}
const originalFetch = globalThis.fetch
function makeSseResponse(lines: string[]): Response {
const encoder = new TextEncoder()
return new Response(
new ReadableStream({
start(controller) {
for (const line of lines) {
controller.enqueue(encoder.encode(line))
}
controller.close()
},
}),
{
headers: {
'Content-Type': 'text/event-stream',
},
},
)
}
function makeStreamChunks(chunks: unknown[]): string[] {
return [
...chunks.map(chunk => `data: ${JSON.stringify(chunk)}\n\n`),
'data: [DONE]\n\n',
]
}
beforeEach(() => {
process.env.OPENAI_BASE_URL = 'http://example.test/v1'
process.env.OPENAI_API_KEY = 'test-key'
})
afterEach(() => {
process.env.OPENAI_BASE_URL = originalEnv.OPENAI_BASE_URL
process.env.OPENAI_API_KEY = originalEnv.OPENAI_API_KEY
globalThis.fetch = originalFetch
})
test('preserves usage from final OpenAI stream chunk with empty choices', async () => {
globalThis.fetch = (async (_input, init) => {
const url = typeof _input === 'string' ? _input : _input.url
expect(url).toBe('http://example.test/v1/chat/completions')
const body = JSON.parse(String(init?.body))
expect(body.stream).toBe(true)
expect(body.stream_options).toEqual({ include_usage: true })
const chunks = makeStreamChunks([
{
id: 'chatcmpl-1',
object: 'chat.completion.chunk',
model: 'fake-model',
choices: [
{
index: 0,
delta: { role: 'assistant', content: 'hello world' },
finish_reason: null,
},
],
},
{
id: 'chatcmpl-1',
object: 'chat.completion.chunk',
model: 'fake-model',
choices: [
{
index: 0,
delta: {},
finish_reason: 'stop',
},
],
},
{
id: 'chatcmpl-1',
object: 'chat.completion.chunk',
model: 'fake-model',
choices: [],
usage: {
prompt_tokens: 123,
completion_tokens: 45,
total_tokens: 168,
},
},
])
return makeSseResponse(chunks)
}) as FetchType
const client = createOpenAIShimClient({}) as {
beta: {
messages: {
create: (
params: Record<string, unknown>,
options?: Record<string, unknown>,
) => Promise<unknown> & {
withResponse: () => Promise<{ data: AsyncIterable<Record<string, unknown>> }>
}
}
}
}
const result = await client.beta.messages
.create({
model: 'fake-model',
system: 'test system',
messages: [{ role: 'user', content: 'hello' }],
max_tokens: 64,
stream: true,
})
.withResponse()
const events: Array<Record<string, unknown>> = []
for await (const event of result.data) {
events.push(event)
}
const usageEvent = events.find(
event => event.type === 'message_delta' && typeof event.usage === 'object' && event.usage !== null,
) as { usage?: { input_tokens?: number; output_tokens?: number } } | undefined
expect(usageEvent).toBeDefined()
expect(usageEvent?.usage?.input_tokens).toBe(123)
expect(usageEvent?.usage?.output_tokens).toBe(45)
})

View File

@@ -267,6 +267,19 @@ function makeMessageId(): string {
return `msg_${Math.random().toString(36).slice(2)}${Date.now().toString(36)}`
}
function convertChunkUsage(
usage: OpenAIStreamChunk['usage'] | undefined,
): Partial<AnthropicUsage> | undefined {
if (!usage) return undefined
return {
input_tokens: usage.prompt_tokens ?? 0,
output_tokens: usage.completion_tokens ?? 0,
cache_creation_input_tokens: 0,
cache_read_input_tokens: 0,
}
}
/**
* Async generator that transforms an OpenAI SSE stream into
* Anthropic-format BetaRawMessageStreamEvent objects.
@@ -279,6 +292,8 @@ async function* openaiStreamToAnthropic(
let contentBlockIndex = 0
const activeToolCalls = new Map<number, { id: string; name: string; index: number }>()
let hasEmittedContentStart = false
let lastStopReason: 'tool_use' | 'max_tokens' | 'end_turn' | null = null
let hasEmittedFinalUsage = false
// Emit message_start
yield {
@@ -326,6 +341,8 @@ async function* openaiStreamToAnthropic(
continue
}
const chunkUsage = convertChunkUsage(chunk.usage)
for (const choice of chunk.choices ?? []) {
const delta = choice.delta
@@ -427,16 +444,31 @@ async function* openaiStreamToAnthropic(
: choice.finish_reason === 'length'
? 'max_tokens'
: 'end_turn'
lastStopReason = stopReason
yield {
type: 'message_delta',
delta: { stop_reason: stopReason, stop_sequence: null },
usage: {
output_tokens: chunk.usage?.completion_tokens ?? 0,
},
...(chunkUsage ? { usage: chunkUsage } : {}),
}
if (chunkUsage) {
hasEmittedFinalUsage = true
}
}
}
if (
!hasEmittedFinalUsage &&
chunkUsage &&
(chunk.choices?.length ?? 0) === 0
) {
yield {
type: 'message_delta',
delta: { stop_reason: lastStopReason, stop_sequence: null },
usage: chunkUsage,
}
hasEmittedFinalUsage = true
}
}
}