diff --git a/src/services/api/openaiShim.ts b/src/services/api/openaiShim.ts index 1f7ed674..c62b7f21 100644 --- a/src/services/api/openaiShim.ts +++ b/src/services/api/openaiShim.ts @@ -67,6 +67,8 @@ import { normalizeToolArguments, hasToolFieldMapping, } from './toolArgumentNormalization.js' +import { logApiCallStart, logApiCallEnd } from '../../utils/requestLogging.js' +import { createStreamState, processStreamChunk, getStreamStats } from '../../utils/streamingOptimizer.js' type SecretValueSource = Partial<{ OPENAI_API_KEY: string @@ -857,6 +859,7 @@ async function* openaiStreamToAnthropic( let lastStopReason: 'tool_use' | 'max_tokens' | 'end_turn' | null = null let hasEmittedFinalUsage = false let hasProcessedFinishReason = false + const streamState = createStreamState() // Emit message_start yield { @@ -1020,6 +1023,7 @@ async function* openaiStreamToAnthropic( delta: { type: 'text_delta', text: visible }, } } + processStreamChunk(streamState, delta.content) } // Tool calls @@ -1039,6 +1043,7 @@ async function* openaiStreamToAnthropic( const toolBlockIndex = contentBlockIndex const initialArguments = tc.function.arguments ?? '' const normalizeAtStop = hasToolFieldMapping(tc.function.name) + processStreamChunk(streamState, tc.function.arguments ?? '') activeToolCalls.set(tc.index, { id: tc.id, name: tc.function.name, @@ -1236,6 +1241,20 @@ async function* openaiStreamToAnthropic( reader.releaseLock() } + const stats = getStreamStats(streamState) + if (stats.totalChunks > 0) { + logForDebugging( + JSON.stringify({ + type: 'stream_stats', + model, + total_chunks: stats.totalChunks, + first_token_ms: stats.firstTokenMs, + duration_ms: stats.durationMs, + }), + { level: 'debug' }, + ) + } + yield { type: 'message_stop' } } @@ -1715,6 +1734,12 @@ class OpenAIShimMessages { } let response: Response | undefined + const provider = request.baseUrl.includes('nvidia') ? 'nvidia-nim' + : request.baseUrl.includes('minimax') ? 'minimax' + : request.baseUrl.includes('localhost:11434') || request.baseUrl.includes('localhost:11435') ? 'ollama' + : request.baseUrl.includes('anthropic') ? 'anthropic' + : 'openai' + const { correlationId, startTime } = logApiCallStart(provider, request.resolvedModel) for (let attempt = 0; attempt < maxAttempts; attempt++) { try { response = await fetchWithProxyRetry( @@ -1752,6 +1777,20 @@ class OpenAIShimMessages { } if (response.ok) { + let tokensIn = 0 + let tokensOut = 0 + // Skip clone() for streaming responses - it blocks until full body is received, + // defeating the purpose of streaming. Usage data is already sent via + // stream_options: { include_usage: true } and can be extracted from the stream. + if (!params.stream) { + try { + const clone = response.clone() + const data = await clone.json() + tokensIn = data.usage?.prompt_tokens ?? 0 + tokensOut = data.usage?.completion_tokens ?? 0 + } catch { /* ignore */ } + } + logApiCallEnd(correlationId, startTime, request.resolvedModel, 'success', tokensIn, tokensOut, false) return response } diff --git a/src/utils/requestLogging.test.ts b/src/utils/requestLogging.test.ts new file mode 100644 index 00000000..d7045602 --- /dev/null +++ b/src/utils/requestLogging.test.ts @@ -0,0 +1,86 @@ +import { describe, expect, it, beforeEach } from 'bun:test' +import { + createCorrelationId, + logApiCallStart, + logApiCallEnd, +} from './requestLogging.js' + +describe('requestLogging', () => { + describe('createCorrelationId', () => { + it('returns a non-empty string', () => { + const id = createCorrelationId() + expect(id).toBeTruthy() + expect(typeof id).toBe('string') + }) + + it('returns unique IDs', () => { + const id1 = createCorrelationId() + const id2 = createCorrelationId() + expect(id1).not.toBe(id2) + }) + }) + + describe('logApiCallStart', () => { + it('returns correlation ID and start time', () => { + const result = logApiCallStart('openai', 'gpt-4o') + expect(result.correlationId).toBeTruthy() + expect(result.startTime).toBeGreaterThan(0) + }) + + it('logs without throwing', () => { + expect(() => logApiCallStart('ollama', 'llama3')).not.toThrow() + }) + }) + + describe('logApiCallEnd', () => { + it('logs success without throwing', () => { + const { correlationId, startTime } = logApiCallStart('openai', 'gpt-4o') + expect(() => + logApiCallEnd( + correlationId, + startTime, + 'gpt-4o', + 'success', + 100, + 50, + false, + ), + ).not.toThrow() + }) + + it('logs error without throwing', () => { + const { correlationId, startTime } = logApiCallStart('openai', 'gpt-4o') + expect(() => + logApiCallEnd( + correlationId, + startTime, + 'gpt-4o', + 'error', + 0, + 0, + false, + undefined, + undefined, + 'Network error', + ), + ).not.toThrow() + }) + + it('logs with all parameters without throwing', () => { + const { correlationId, startTime } = logApiCallStart('openai', 'gpt-4o') + expect(() => + logApiCallEnd( + correlationId, + startTime, + 'gpt-4o', + 'success', + 100, + 50, + true, + 'error message', + { provider: 'openai' }, + ), + ).not.toThrow() + }) + }) +}) \ No newline at end of file diff --git a/src/utils/requestLogging.ts b/src/utils/requestLogging.ts new file mode 100644 index 00000000..62b5b9c8 --- /dev/null +++ b/src/utils/requestLogging.ts @@ -0,0 +1,89 @@ +/** + * Structured Request Logging + * + * Uses existing logForDebugging for structured logging. + */ + +import { randomUUID } from 'crypto' +import { logForDebugging } from './debug.js' + +export interface RequestLog { + correlationId: string + timestamp: number + provider: string + model: string + duration: number + status: 'success' | 'error' + tokensIn: number + tokensOut: number + error?: string + streaming: boolean +} + +export function createCorrelationId(): string { + return randomUUID() +} + +export function logApiCallStart( + provider: string, + model: string, +): { correlationId: string; startTime: number } { + const correlationId = createCorrelationId() + const startTime = Date.now() + + logForDebugging( + JSON.stringify({ + type: 'api_call_start', + correlationId, + provider, + model, + timestamp: startTime, + }), + { level: 'debug' }, + ) + + return { correlationId, startTime } +} + +export function logApiCallEnd( + correlationId: string, + startTime: number, + model: string, + status: 'success' | 'error', + tokensIn: number, + tokensOut: number, + streaming: boolean, + firstTokenMs?: number, + totalChunks?: number, + error?: string, +): void { + const duration = Date.now() - startTime + + const logData: Record = { + type: status === 'error' ? 'api_call_error' : 'api_call_end', + correlationId, + model, + duration_ms: duration, + status, + tokens_in: tokensIn, + tokens_out: tokensOut, + streaming, + } + + if (firstTokenMs !== undefined) { + logData.first_token_ms = firstTokenMs + } + + if (totalChunks !== undefined) { + logData.total_chunks = totalChunks + } + + if (error) { + logData.error = error + } + + logForDebugging( + JSON.stringify(logData), + { level: status === 'error' ? 'error' : 'debug' }, + ) +} \ No newline at end of file diff --git a/src/utils/streamingOptimizer.test.ts b/src/utils/streamingOptimizer.test.ts new file mode 100644 index 00000000..40414b08 --- /dev/null +++ b/src/utils/streamingOptimizer.test.ts @@ -0,0 +1,61 @@ +import { describe, expect, it, beforeEach } from 'bun:test' +import { + createStreamState, + processStreamChunk, + flushStreamBuffer, + getStreamStats, +} from './streamingOptimizer.js' + +describe('streamingOptimizer', () => { + let state: ReturnType + + beforeEach(() => { + state = createStreamState() + }) + + describe('createStreamState', () => { + it('creates initial state with zero counts', () => { + expect(state.chunkCount).toBe(0) + expect(state.firstTokenTime).toBeNull() + expect(state.startTime).toBeGreaterThan(0) + }) + }) + + describe('processStreamChunk', () => { + it('tracks first token time on first chunk', () => { + processStreamChunk(state, 'hello') + expect(state.firstTokenTime).not.toBeNull() + expect(state.chunkCount).toBe(1) + }) + + it('increments chunk count', () => { + processStreamChunk(state, 'chunk1') + processStreamChunk(state, 'chunk2') + expect(state.chunkCount).toBe(2) + }) + }) + + describe('getStreamStats', () => { + it('returns zero values for empty stream', () => { + const stats = getStreamStats(state) + expect(stats.totalChunks).toBe(0) + expect(stats.firstTokenMs).toBeNull() + expect(stats.durationMs).toBeGreaterThanOrEqual(0) + }) + + it('returns correct stats after processing chunks', () => { + processStreamChunk(state, 'test') + const stats = getStreamStats(state) + expect(stats.totalChunks).toBe(1) + expect(stats.firstTokenMs).toBeGreaterThanOrEqual(0) + expect(stats.durationMs).toBeGreaterThanOrEqual(0) + }) + }) + + describe('flushStreamBuffer', () => { + it('returns empty string (no-op)', () => { + const result = flushStreamBuffer(state) + expect(result).toBe('') + }) + }) +}) \ No newline at end of file diff --git a/src/utils/streamingOptimizer.ts b/src/utils/streamingOptimizer.ts new file mode 100644 index 00000000..10bb33d3 --- /dev/null +++ b/src/utils/streamingOptimizer.ts @@ -0,0 +1,51 @@ +/** + * Streaming Stats Tracker + * + * Observational stats tracking for streaming responses. + * No buffering - purely tracks metrics for monitoring. + */ + +export interface StreamStats { + totalChunks: number + firstTokenMs: number | null + durationMs: number +} + +export interface StreamState { + chunkCount: number + firstTokenTime: number | null + startTime: number +} + +export function createStreamState(): StreamState { + return { + chunkCount: 0, + firstTokenTime: null, + startTime: Date.now(), + } +} + +export function processStreamChunk(state: StreamState, _chunk: string): void { + if (state.firstTokenTime === null) { + state.firstTokenTime = Date.now() + } + state.chunkCount++ +} + +export function flushStreamBuffer(_state: StreamState): string { + return '' // No-op - kept for API compatibility +} + +export function getStreamStats(state: StreamState): StreamStats { + const now = Date.now() + const firstTokenMs = state.firstTokenTime + ? now - state.firstTokenTime + : null + const durationMs = now - state.startTime + + return { + totalChunks: state.chunkCount, + firstTokenMs, + durationMs, + } +} \ No newline at end of file