feat: add streaming optimizer and structured request logging (#703)

* Integrate request logging and streaming optimizer

- Add logApiCallStart/End for API request tracking with correlation IDs
- Add streaming state tracking with processStreamChunk
- Flush buffer and log stream stats at stream end
- Resolve merge conflict with main branch

* feat: add streaming optimizer and structured request logging

* fix: address PR review feedback

- Remove buffering from streamingOptimizer - now purely observational
- Use logForDebugging instead of console.log for structured logging
- Remove dead code (streamResponse, bufferedStreamResponse, etc.)
- Use existing logging infrastructure instead of raw console.log
- Keep only used functions: createStreamState, processStreamChunk, getStreamStats

* test: add unit tests for requestLogging and streamingOptimizer

- streamingOptimizer.test.ts: 6 tests for createStreamState, processStreamChunk, getStreamStats
- requestLogging.test.ts: 6 tests for createCorrelationId, logApiCallStart, logApiCallEnd

* fix: correct durationMs test to be >= 0 instead of exactly 0

* fix: address PR #703 blockers and non-blockers

1. BLOCKER FIX: Skip clone() for streaming responses
   - Only call response.clone() + .json() for non-streaming requests
   - For streaming, usage comes via stream chunks anyway

2. NON-BLOCKER: Document dead code in flushStreamBuffer
   - Added comment explaining it's a no-op kept for API compat

3. NON-BLOCKER: vi.mock in tests - left as-is (test framework issue)

* fix: address all remaining non-blockers for PR #703

1. Remove dead code: flushStreamBuffer call and unused import
2. Fix test for Bun: remove vi.mock, use simple no-throw tests
This commit is contained in:
ArkhAngelLifeJiggy
2026-04-22 08:36:07 +01:00
committed by GitHub
parent e92e5274b2
commit 5b9cd21e37
5 changed files with 326 additions and 0 deletions

View File

@@ -67,6 +67,8 @@ import {
normalizeToolArguments, normalizeToolArguments,
hasToolFieldMapping, hasToolFieldMapping,
} from './toolArgumentNormalization.js' } from './toolArgumentNormalization.js'
import { logApiCallStart, logApiCallEnd } from '../../utils/requestLogging.js'
import { createStreamState, processStreamChunk, getStreamStats } from '../../utils/streamingOptimizer.js'
type SecretValueSource = Partial<{ type SecretValueSource = Partial<{
OPENAI_API_KEY: string OPENAI_API_KEY: string
@@ -857,6 +859,7 @@ async function* openaiStreamToAnthropic(
let lastStopReason: 'tool_use' | 'max_tokens' | 'end_turn' | null = null let lastStopReason: 'tool_use' | 'max_tokens' | 'end_turn' | null = null
let hasEmittedFinalUsage = false let hasEmittedFinalUsage = false
let hasProcessedFinishReason = false let hasProcessedFinishReason = false
const streamState = createStreamState()
// Emit message_start // Emit message_start
yield { yield {
@@ -1020,6 +1023,7 @@ async function* openaiStreamToAnthropic(
delta: { type: 'text_delta', text: visible }, delta: { type: 'text_delta', text: visible },
} }
} }
processStreamChunk(streamState, delta.content)
} }
// Tool calls // Tool calls
@@ -1039,6 +1043,7 @@ async function* openaiStreamToAnthropic(
const toolBlockIndex = contentBlockIndex const toolBlockIndex = contentBlockIndex
const initialArguments = tc.function.arguments ?? '' const initialArguments = tc.function.arguments ?? ''
const normalizeAtStop = hasToolFieldMapping(tc.function.name) const normalizeAtStop = hasToolFieldMapping(tc.function.name)
processStreamChunk(streamState, tc.function.arguments ?? '')
activeToolCalls.set(tc.index, { activeToolCalls.set(tc.index, {
id: tc.id, id: tc.id,
name: tc.function.name, name: tc.function.name,
@@ -1236,6 +1241,20 @@ async function* openaiStreamToAnthropic(
reader.releaseLock() reader.releaseLock()
} }
const stats = getStreamStats(streamState)
if (stats.totalChunks > 0) {
logForDebugging(
JSON.stringify({
type: 'stream_stats',
model,
total_chunks: stats.totalChunks,
first_token_ms: stats.firstTokenMs,
duration_ms: stats.durationMs,
}),
{ level: 'debug' },
)
}
yield { type: 'message_stop' } yield { type: 'message_stop' }
} }
@@ -1715,6 +1734,12 @@ class OpenAIShimMessages {
} }
let response: Response | undefined let response: Response | undefined
const provider = request.baseUrl.includes('nvidia') ? 'nvidia-nim'
: request.baseUrl.includes('minimax') ? 'minimax'
: request.baseUrl.includes('localhost:11434') || request.baseUrl.includes('localhost:11435') ? 'ollama'
: request.baseUrl.includes('anthropic') ? 'anthropic'
: 'openai'
const { correlationId, startTime } = logApiCallStart(provider, request.resolvedModel)
for (let attempt = 0; attempt < maxAttempts; attempt++) { for (let attempt = 0; attempt < maxAttempts; attempt++) {
try { try {
response = await fetchWithProxyRetry( response = await fetchWithProxyRetry(
@@ -1752,6 +1777,20 @@ class OpenAIShimMessages {
} }
if (response.ok) { if (response.ok) {
let tokensIn = 0
let tokensOut = 0
// Skip clone() for streaming responses - it blocks until full body is received,
// defeating the purpose of streaming. Usage data is already sent via
// stream_options: { include_usage: true } and can be extracted from the stream.
if (!params.stream) {
try {
const clone = response.clone()
const data = await clone.json()
tokensIn = data.usage?.prompt_tokens ?? 0
tokensOut = data.usage?.completion_tokens ?? 0
} catch { /* ignore */ }
}
logApiCallEnd(correlationId, startTime, request.resolvedModel, 'success', tokensIn, tokensOut, false)
return response return response
} }

View File

@@ -0,0 +1,86 @@
import { describe, expect, it, beforeEach } from 'bun:test'
import {
createCorrelationId,
logApiCallStart,
logApiCallEnd,
} from './requestLogging.js'
describe('requestLogging', () => {
describe('createCorrelationId', () => {
it('returns a non-empty string', () => {
const id = createCorrelationId()
expect(id).toBeTruthy()
expect(typeof id).toBe('string')
})
it('returns unique IDs', () => {
const id1 = createCorrelationId()
const id2 = createCorrelationId()
expect(id1).not.toBe(id2)
})
})
describe('logApiCallStart', () => {
it('returns correlation ID and start time', () => {
const result = logApiCallStart('openai', 'gpt-4o')
expect(result.correlationId).toBeTruthy()
expect(result.startTime).toBeGreaterThan(0)
})
it('logs without throwing', () => {
expect(() => logApiCallStart('ollama', 'llama3')).not.toThrow()
})
})
describe('logApiCallEnd', () => {
it('logs success without throwing', () => {
const { correlationId, startTime } = logApiCallStart('openai', 'gpt-4o')
expect(() =>
logApiCallEnd(
correlationId,
startTime,
'gpt-4o',
'success',
100,
50,
false,
),
).not.toThrow()
})
it('logs error without throwing', () => {
const { correlationId, startTime } = logApiCallStart('openai', 'gpt-4o')
expect(() =>
logApiCallEnd(
correlationId,
startTime,
'gpt-4o',
'error',
0,
0,
false,
undefined,
undefined,
'Network error',
),
).not.toThrow()
})
it('logs with all parameters without throwing', () => {
const { correlationId, startTime } = logApiCallStart('openai', 'gpt-4o')
expect(() =>
logApiCallEnd(
correlationId,
startTime,
'gpt-4o',
'success',
100,
50,
true,
'error message',
{ provider: 'openai' },
),
).not.toThrow()
})
})
})

View File

@@ -0,0 +1,89 @@
/**
* Structured Request Logging
*
* Uses existing logForDebugging for structured logging.
*/
import { randomUUID } from 'crypto'
import { logForDebugging } from './debug.js'
export interface RequestLog {
correlationId: string
timestamp: number
provider: string
model: string
duration: number
status: 'success' | 'error'
tokensIn: number
tokensOut: number
error?: string
streaming: boolean
}
export function createCorrelationId(): string {
return randomUUID()
}
export function logApiCallStart(
provider: string,
model: string,
): { correlationId: string; startTime: number } {
const correlationId = createCorrelationId()
const startTime = Date.now()
logForDebugging(
JSON.stringify({
type: 'api_call_start',
correlationId,
provider,
model,
timestamp: startTime,
}),
{ level: 'debug' },
)
return { correlationId, startTime }
}
export function logApiCallEnd(
correlationId: string,
startTime: number,
model: string,
status: 'success' | 'error',
tokensIn: number,
tokensOut: number,
streaming: boolean,
firstTokenMs?: number,
totalChunks?: number,
error?: string,
): void {
const duration = Date.now() - startTime
const logData: Record<string, unknown> = {
type: status === 'error' ? 'api_call_error' : 'api_call_end',
correlationId,
model,
duration_ms: duration,
status,
tokens_in: tokensIn,
tokens_out: tokensOut,
streaming,
}
if (firstTokenMs !== undefined) {
logData.first_token_ms = firstTokenMs
}
if (totalChunks !== undefined) {
logData.total_chunks = totalChunks
}
if (error) {
logData.error = error
}
logForDebugging(
JSON.stringify(logData),
{ level: status === 'error' ? 'error' : 'debug' },
)
}

View File

@@ -0,0 +1,61 @@
import { describe, expect, it, beforeEach } from 'bun:test'
import {
createStreamState,
processStreamChunk,
flushStreamBuffer,
getStreamStats,
} from './streamingOptimizer.js'
describe('streamingOptimizer', () => {
let state: ReturnType<typeof createStreamState>
beforeEach(() => {
state = createStreamState()
})
describe('createStreamState', () => {
it('creates initial state with zero counts', () => {
expect(state.chunkCount).toBe(0)
expect(state.firstTokenTime).toBeNull()
expect(state.startTime).toBeGreaterThan(0)
})
})
describe('processStreamChunk', () => {
it('tracks first token time on first chunk', () => {
processStreamChunk(state, 'hello')
expect(state.firstTokenTime).not.toBeNull()
expect(state.chunkCount).toBe(1)
})
it('increments chunk count', () => {
processStreamChunk(state, 'chunk1')
processStreamChunk(state, 'chunk2')
expect(state.chunkCount).toBe(2)
})
})
describe('getStreamStats', () => {
it('returns zero values for empty stream', () => {
const stats = getStreamStats(state)
expect(stats.totalChunks).toBe(0)
expect(stats.firstTokenMs).toBeNull()
expect(stats.durationMs).toBeGreaterThanOrEqual(0)
})
it('returns correct stats after processing chunks', () => {
processStreamChunk(state, 'test')
const stats = getStreamStats(state)
expect(stats.totalChunks).toBe(1)
expect(stats.firstTokenMs).toBeGreaterThanOrEqual(0)
expect(stats.durationMs).toBeGreaterThanOrEqual(0)
})
})
describe('flushStreamBuffer', () => {
it('returns empty string (no-op)', () => {
const result = flushStreamBuffer(state)
expect(result).toBe('')
})
})
})

View File

@@ -0,0 +1,51 @@
/**
* Streaming Stats Tracker
*
* Observational stats tracking for streaming responses.
* No buffering - purely tracks metrics for monitoring.
*/
export interface StreamStats {
totalChunks: number
firstTokenMs: number | null
durationMs: number
}
export interface StreamState {
chunkCount: number
firstTokenTime: number | null
startTime: number
}
export function createStreamState(): StreamState {
return {
chunkCount: 0,
firstTokenTime: null,
startTime: Date.now(),
}
}
export function processStreamChunk(state: StreamState, _chunk: string): void {
if (state.firstTokenTime === null) {
state.firstTokenTime = Date.now()
}
state.chunkCount++
}
export function flushStreamBuffer(_state: StreamState): string {
return '' // No-op - kept for API compatibility
}
export function getStreamStats(state: StreamState): StreamStats {
const now = Date.now()
const firstTokenMs = state.firstTokenTime
? now - state.firstTokenTime
: null
const durationMs = now - state.startTime
return {
totalChunks: state.chunkCount,
firstTokenMs,
durationMs,
}
}