Merge pull request #14 from umairinayat/fix/openai-stream-usage-accounting
Fix missing usage accounting for final OpenAI stream chunks
This commit is contained in:
135
src/services/api/openaiShim.test.ts
Normal file
135
src/services/api/openaiShim.test.ts
Normal file
@@ -0,0 +1,135 @@
|
|||||||
|
import { afterEach, beforeEach, expect, test } from 'bun:test'
|
||||||
|
import { createOpenAIShimClient } from './openaiShim.ts'
|
||||||
|
|
||||||
|
type FetchType = typeof globalThis.fetch
|
||||||
|
|
||||||
|
const originalEnv = {
|
||||||
|
OPENAI_BASE_URL: process.env.OPENAI_BASE_URL,
|
||||||
|
OPENAI_API_KEY: process.env.OPENAI_API_KEY,
|
||||||
|
}
|
||||||
|
|
||||||
|
const originalFetch = globalThis.fetch
|
||||||
|
|
||||||
|
function makeSseResponse(lines: string[]): Response {
|
||||||
|
const encoder = new TextEncoder()
|
||||||
|
return new Response(
|
||||||
|
new ReadableStream({
|
||||||
|
start(controller) {
|
||||||
|
for (const line of lines) {
|
||||||
|
controller.enqueue(encoder.encode(line))
|
||||||
|
}
|
||||||
|
controller.close()
|
||||||
|
},
|
||||||
|
}),
|
||||||
|
{
|
||||||
|
headers: {
|
||||||
|
'Content-Type': 'text/event-stream',
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
function makeStreamChunks(chunks: unknown[]): string[] {
|
||||||
|
return [
|
||||||
|
...chunks.map(chunk => `data: ${JSON.stringify(chunk)}\n\n`),
|
||||||
|
'data: [DONE]\n\n',
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
|
beforeEach(() => {
|
||||||
|
process.env.OPENAI_BASE_URL = 'http://example.test/v1'
|
||||||
|
process.env.OPENAI_API_KEY = 'test-key'
|
||||||
|
})
|
||||||
|
|
||||||
|
afterEach(() => {
|
||||||
|
process.env.OPENAI_BASE_URL = originalEnv.OPENAI_BASE_URL
|
||||||
|
process.env.OPENAI_API_KEY = originalEnv.OPENAI_API_KEY
|
||||||
|
globalThis.fetch = originalFetch
|
||||||
|
})
|
||||||
|
|
||||||
|
test('preserves usage from final OpenAI stream chunk with empty choices', async () => {
|
||||||
|
globalThis.fetch = (async (_input, init) => {
|
||||||
|
const url = typeof _input === 'string' ? _input : _input.url
|
||||||
|
expect(url).toBe('http://example.test/v1/chat/completions')
|
||||||
|
|
||||||
|
const body = JSON.parse(String(init?.body))
|
||||||
|
expect(body.stream).toBe(true)
|
||||||
|
expect(body.stream_options).toEqual({ include_usage: true })
|
||||||
|
|
||||||
|
const chunks = makeStreamChunks([
|
||||||
|
{
|
||||||
|
id: 'chatcmpl-1',
|
||||||
|
object: 'chat.completion.chunk',
|
||||||
|
model: 'fake-model',
|
||||||
|
choices: [
|
||||||
|
{
|
||||||
|
index: 0,
|
||||||
|
delta: { role: 'assistant', content: 'hello world' },
|
||||||
|
finish_reason: null,
|
||||||
|
},
|
||||||
|
],
|
||||||
|
},
|
||||||
|
{
|
||||||
|
id: 'chatcmpl-1',
|
||||||
|
object: 'chat.completion.chunk',
|
||||||
|
model: 'fake-model',
|
||||||
|
choices: [
|
||||||
|
{
|
||||||
|
index: 0,
|
||||||
|
delta: {},
|
||||||
|
finish_reason: 'stop',
|
||||||
|
},
|
||||||
|
],
|
||||||
|
},
|
||||||
|
{
|
||||||
|
id: 'chatcmpl-1',
|
||||||
|
object: 'chat.completion.chunk',
|
||||||
|
model: 'fake-model',
|
||||||
|
choices: [],
|
||||||
|
usage: {
|
||||||
|
prompt_tokens: 123,
|
||||||
|
completion_tokens: 45,
|
||||||
|
total_tokens: 168,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
])
|
||||||
|
|
||||||
|
return makeSseResponse(chunks)
|
||||||
|
}) as FetchType
|
||||||
|
|
||||||
|
const client = createOpenAIShimClient({}) as {
|
||||||
|
beta: {
|
||||||
|
messages: {
|
||||||
|
create: (
|
||||||
|
params: Record<string, unknown>,
|
||||||
|
options?: Record<string, unknown>,
|
||||||
|
) => Promise<unknown> & {
|
||||||
|
withResponse: () => Promise<{ data: AsyncIterable<Record<string, unknown>> }>
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const result = await client.beta.messages
|
||||||
|
.create({
|
||||||
|
model: 'fake-model',
|
||||||
|
system: 'test system',
|
||||||
|
messages: [{ role: 'user', content: 'hello' }],
|
||||||
|
max_tokens: 64,
|
||||||
|
stream: true,
|
||||||
|
})
|
||||||
|
.withResponse()
|
||||||
|
|
||||||
|
const events: Array<Record<string, unknown>> = []
|
||||||
|
for await (const event of result.data) {
|
||||||
|
events.push(event)
|
||||||
|
}
|
||||||
|
|
||||||
|
const usageEvent = events.find(
|
||||||
|
event => event.type === 'message_delta' && typeof event.usage === 'object' && event.usage !== null,
|
||||||
|
) as { usage?: { input_tokens?: number; output_tokens?: number } } | undefined
|
||||||
|
|
||||||
|
expect(usageEvent).toBeDefined()
|
||||||
|
expect(usageEvent?.usage?.input_tokens).toBe(123)
|
||||||
|
expect(usageEvent?.usage?.output_tokens).toBe(45)
|
||||||
|
})
|
||||||
@@ -265,6 +265,19 @@ function makeMessageId(): string {
|
|||||||
return `msg_${Math.random().toString(36).slice(2)}${Date.now().toString(36)}`
|
return `msg_${Math.random().toString(36).slice(2)}${Date.now().toString(36)}`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function convertChunkUsage(
|
||||||
|
usage: OpenAIStreamChunk['usage'] | undefined,
|
||||||
|
): Partial<AnthropicUsage> | undefined {
|
||||||
|
if (!usage) return undefined
|
||||||
|
|
||||||
|
return {
|
||||||
|
input_tokens: usage.prompt_tokens ?? 0,
|
||||||
|
output_tokens: usage.completion_tokens ?? 0,
|
||||||
|
cache_creation_input_tokens: 0,
|
||||||
|
cache_read_input_tokens: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Async generator that transforms an OpenAI SSE stream into
|
* Async generator that transforms an OpenAI SSE stream into
|
||||||
* Anthropic-format BetaRawMessageStreamEvent objects.
|
* Anthropic-format BetaRawMessageStreamEvent objects.
|
||||||
@@ -277,6 +290,8 @@ async function* openaiStreamToAnthropic(
|
|||||||
let contentBlockIndex = 0
|
let contentBlockIndex = 0
|
||||||
const activeToolCalls = new Map<number, { id: string; name: string; index: number }>()
|
const activeToolCalls = new Map<number, { id: string; name: string; index: number }>()
|
||||||
let hasEmittedContentStart = false
|
let hasEmittedContentStart = false
|
||||||
|
let lastStopReason: 'tool_use' | 'max_tokens' | 'end_turn' | null = null
|
||||||
|
let hasEmittedFinalUsage = false
|
||||||
|
|
||||||
// Emit message_start
|
// Emit message_start
|
||||||
yield {
|
yield {
|
||||||
@@ -324,6 +339,8 @@ async function* openaiStreamToAnthropic(
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const chunkUsage = convertChunkUsage(chunk.usage)
|
||||||
|
|
||||||
for (const choice of chunk.choices ?? []) {
|
for (const choice of chunk.choices ?? []) {
|
||||||
const delta = choice.delta
|
const delta = choice.delta
|
||||||
|
|
||||||
@@ -425,16 +442,31 @@ async function* openaiStreamToAnthropic(
|
|||||||
: choice.finish_reason === 'length'
|
: choice.finish_reason === 'length'
|
||||||
? 'max_tokens'
|
? 'max_tokens'
|
||||||
: 'end_turn'
|
: 'end_turn'
|
||||||
|
lastStopReason = stopReason
|
||||||
|
|
||||||
yield {
|
yield {
|
||||||
type: 'message_delta',
|
type: 'message_delta',
|
||||||
delta: { stop_reason: stopReason, stop_sequence: null },
|
delta: { stop_reason: stopReason, stop_sequence: null },
|
||||||
usage: {
|
...(chunkUsage ? { usage: chunkUsage } : {}),
|
||||||
output_tokens: chunk.usage?.completion_tokens ?? 0,
|
}
|
||||||
},
|
if (chunkUsage) {
|
||||||
|
hasEmittedFinalUsage = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (
|
||||||
|
!hasEmittedFinalUsage &&
|
||||||
|
chunkUsage &&
|
||||||
|
(chunk.choices?.length ?? 0) === 0
|
||||||
|
) {
|
||||||
|
yield {
|
||||||
|
type: 'message_delta',
|
||||||
|
delta: { stop_reason: lastStopReason, stop_sequence: null },
|
||||||
|
usage: chunkUsage,
|
||||||
|
}
|
||||||
|
hasEmittedFinalUsage = true
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user