fix retry Codex and OpenAI fetches via proxy-aware helper (#720)
This commit is contained in:
@@ -1,4 +1,5 @@
|
|||||||
import { APIError } from '@anthropic-ai/sdk'
|
import { APIError } from '@anthropic-ai/sdk'
|
||||||
|
import { fetchWithProxyRetry } from './fetchWithProxyRetry.js'
|
||||||
import type {
|
import type {
|
||||||
ResolvedCodexCredentials,
|
ResolvedCodexCredentials,
|
||||||
ResolvedProviderRequest,
|
ResolvedProviderRequest,
|
||||||
@@ -559,12 +560,15 @@ export async function performCodexRequest(options: {
|
|||||||
}
|
}
|
||||||
headers.originator ??= 'openclaude'
|
headers.originator ??= 'openclaude'
|
||||||
|
|
||||||
const response = await fetch(`${options.request.baseUrl}/responses`, {
|
const response = await fetchWithProxyRetry(
|
||||||
|
`${options.request.baseUrl}/responses`,
|
||||||
|
{
|
||||||
method: 'POST',
|
method: 'POST',
|
||||||
headers,
|
headers,
|
||||||
body: JSON.stringify(body),
|
body: JSON.stringify(body),
|
||||||
signal: options.signal,
|
signal: options.signal,
|
||||||
})
|
},
|
||||||
|
)
|
||||||
|
|
||||||
if (!response.ok) {
|
if (!response.ok) {
|
||||||
const errorBody = await response.text().catch(() => 'unknown error')
|
const errorBody = await response.text().catch(() => 'unknown error')
|
||||||
|
|||||||
86
src/services/api/fetchWithProxyRetry.test.ts
Normal file
86
src/services/api/fetchWithProxyRetry.test.ts
Normal file
@@ -0,0 +1,86 @@
|
|||||||
|
import { afterEach, beforeEach, expect, test } from 'bun:test'
|
||||||
|
|
||||||
|
import { _resetKeepAliveForTesting } from '../../utils/proxy.js'
|
||||||
|
import {
|
||||||
|
fetchWithProxyRetry,
|
||||||
|
isRetryableFetchError,
|
||||||
|
} from './fetchWithProxyRetry.js'
|
||||||
|
|
||||||
|
type FetchType = typeof globalThis.fetch
|
||||||
|
|
||||||
|
const originalFetch = globalThis.fetch
|
||||||
|
const originalEnv = {
|
||||||
|
HTTP_PROXY: process.env.HTTP_PROXY,
|
||||||
|
HTTPS_PROXY: process.env.HTTPS_PROXY,
|
||||||
|
}
|
||||||
|
|
||||||
|
function restoreEnv(key: 'HTTP_PROXY' | 'HTTPS_PROXY', value: string | undefined): void {
|
||||||
|
if (value === undefined) {
|
||||||
|
delete process.env[key]
|
||||||
|
} else {
|
||||||
|
process.env[key] = value
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
beforeEach(() => {
|
||||||
|
process.env.HTTP_PROXY = 'http://127.0.0.1:15236'
|
||||||
|
delete process.env.HTTPS_PROXY
|
||||||
|
_resetKeepAliveForTesting()
|
||||||
|
})
|
||||||
|
|
||||||
|
afterEach(() => {
|
||||||
|
globalThis.fetch = originalFetch
|
||||||
|
restoreEnv('HTTP_PROXY', originalEnv.HTTP_PROXY)
|
||||||
|
restoreEnv('HTTPS_PROXY', originalEnv.HTTPS_PROXY)
|
||||||
|
_resetKeepAliveForTesting()
|
||||||
|
})
|
||||||
|
|
||||||
|
test('isRetryableFetchError matches Bun socket-closed failures', () => {
|
||||||
|
expect(
|
||||||
|
isRetryableFetchError(
|
||||||
|
new Error(
|
||||||
|
'The socket connection was closed unexpectedly. For more information, pass `verbose: true` in the second argument to fetch()',
|
||||||
|
),
|
||||||
|
),
|
||||||
|
).toBe(true)
|
||||||
|
})
|
||||||
|
|
||||||
|
test('fetchWithProxyRetry retries once with keepalive disabled after socket closure', async () => {
|
||||||
|
const calls: Array<RequestInit | undefined> = []
|
||||||
|
|
||||||
|
globalThis.fetch = (async (_input, init) => {
|
||||||
|
calls.push(init)
|
||||||
|
if (calls.length === 1) {
|
||||||
|
throw new Error(
|
||||||
|
'The socket connection was closed unexpectedly. For more information, pass `verbose: true` in the second argument to fetch()',
|
||||||
|
)
|
||||||
|
}
|
||||||
|
return new Response('ok')
|
||||||
|
}) as FetchType
|
||||||
|
|
||||||
|
const response = await fetchWithProxyRetry('https://example.com/search', {
|
||||||
|
method: 'POST',
|
||||||
|
})
|
||||||
|
|
||||||
|
expect(await response.text()).toBe('ok')
|
||||||
|
expect(calls).toHaveLength(2)
|
||||||
|
expect((calls[0] as RequestInit & { proxy?: string }).proxy).toBe(
|
||||||
|
'http://127.0.0.1:15236',
|
||||||
|
)
|
||||||
|
expect((calls[0] as RequestInit).keepalive).toBeUndefined()
|
||||||
|
expect((calls[1] as RequestInit).keepalive).toBe(false)
|
||||||
|
})
|
||||||
|
|
||||||
|
test('fetchWithProxyRetry does not retry non-network errors', async () => {
|
||||||
|
let attempts = 0
|
||||||
|
|
||||||
|
globalThis.fetch = (async () => {
|
||||||
|
attempts += 1
|
||||||
|
throw new Error('400 bad request')
|
||||||
|
}) as FetchType
|
||||||
|
|
||||||
|
await expect(fetchWithProxyRetry('https://example.com')).rejects.toThrow(
|
||||||
|
'400 bad request',
|
||||||
|
)
|
||||||
|
expect(attempts).toBe(1)
|
||||||
|
})
|
||||||
44
src/services/api/fetchWithProxyRetry.ts
Normal file
44
src/services/api/fetchWithProxyRetry.ts
Normal file
@@ -0,0 +1,44 @@
|
|||||||
|
import { disableKeepAlive, getProxyFetchOptions } from '../../utils/proxy.js'
|
||||||
|
|
||||||
|
const RETRYABLE_FETCH_ERROR_PATTERN =
|
||||||
|
/socket connection was closed unexpectedly|ECONNRESET|EPIPE|socket hang up|Connection reset by peer|fetch failed/i
|
||||||
|
|
||||||
|
export function isRetryableFetchError(error: unknown): boolean {
|
||||||
|
if (!(error instanceof Error)) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if (error.name === 'AbortError') {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return RETRYABLE_FETCH_ERROR_PATTERN.test(error.message)
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function fetchWithProxyRetry(
|
||||||
|
input: string | URL | Request,
|
||||||
|
init?: RequestInit,
|
||||||
|
options?: { forAnthropicAPI?: boolean; maxAttempts?: number },
|
||||||
|
): Promise<Response> {
|
||||||
|
const maxAttempts = Math.max(1, options?.maxAttempts ?? 2)
|
||||||
|
let lastError: unknown
|
||||||
|
|
||||||
|
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
|
||||||
|
try {
|
||||||
|
return await fetch(input, {
|
||||||
|
...init,
|
||||||
|
...getProxyFetchOptions({
|
||||||
|
forAnthropicAPI: options?.forAnthropicAPI,
|
||||||
|
}),
|
||||||
|
})
|
||||||
|
} catch (error) {
|
||||||
|
lastError = error
|
||||||
|
if (attempt >= maxAttempts || !isRetryableFetchError(error)) {
|
||||||
|
throw error
|
||||||
|
}
|
||||||
|
disableKeepAlive()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
throw lastError instanceof Error
|
||||||
|
? lastError
|
||||||
|
: new Error('Fetch failed without an error object')
|
||||||
|
}
|
||||||
@@ -47,6 +47,7 @@ import {
|
|||||||
type AnthropicUsage,
|
type AnthropicUsage,
|
||||||
type ShimCreateParams,
|
type ShimCreateParams,
|
||||||
} from './codexShim.js'
|
} from './codexShim.js'
|
||||||
|
import { fetchWithProxyRetry } from './fetchWithProxyRetry.js'
|
||||||
import {
|
import {
|
||||||
isLocalProviderUrl,
|
isLocalProviderUrl,
|
||||||
resolveRuntimeCodexCredentials,
|
resolveRuntimeCodexCredentials,
|
||||||
@@ -1431,7 +1432,7 @@ class OpenAIShimMessages {
|
|||||||
const maxAttempts = isGithub ? GITHUB_429_MAX_RETRIES : 1
|
const maxAttempts = isGithub ? GITHUB_429_MAX_RETRIES : 1
|
||||||
let response: Response | undefined
|
let response: Response | undefined
|
||||||
for (let attempt = 0; attempt < maxAttempts; attempt++) {
|
for (let attempt = 0; attempt < maxAttempts; attempt++) {
|
||||||
response = await fetch(chatCompletionsUrl, fetchInit)
|
response = await fetchWithProxyRetry(chatCompletionsUrl, fetchInit)
|
||||||
if (response.ok) {
|
if (response.ok) {
|
||||||
return response
|
return response
|
||||||
}
|
}
|
||||||
@@ -1504,7 +1505,7 @@ class OpenAIShimMessages {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const responsesResponse = await fetch(responsesUrl, {
|
const responsesResponse = await fetchWithProxyRetry(responsesUrl, {
|
||||||
method: 'POST',
|
method: 'POST',
|
||||||
headers,
|
headers,
|
||||||
body: JSON.stringify(responsesBody),
|
body: JSON.stringify(responsesBody),
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ import { z } from 'zod/v4'
|
|||||||
import { getFeatureValue_CACHED_MAY_BE_STALE } from '../../services/analytics/growthbook.js'
|
import { getFeatureValue_CACHED_MAY_BE_STALE } from '../../services/analytics/growthbook.js'
|
||||||
import { queryModelWithStreaming } from '../../services/api/claude.js'
|
import { queryModelWithStreaming } from '../../services/api/claude.js'
|
||||||
import { collectCodexCompletedResponse } from '../../services/api/codexShim.js'
|
import { collectCodexCompletedResponse } from '../../services/api/codexShim.js'
|
||||||
|
import { fetchWithProxyRetry } from '../../services/api/fetchWithProxyRetry.js'
|
||||||
import {
|
import {
|
||||||
resolveCodexApiCredentials,
|
resolveCodexApiCredentials,
|
||||||
resolveProviderRequest,
|
resolveProviderRequest,
|
||||||
@@ -314,7 +315,7 @@ async function runCodexWebSearch(
|
|||||||
body.reasoning = request.reasoning
|
body.reasoning = request.reasoning
|
||||||
}
|
}
|
||||||
|
|
||||||
const response = await fetch(`${request.baseUrl}/responses`, {
|
const response = await fetchWithProxyRetry(`${request.baseUrl}/responses`, {
|
||||||
method: 'POST',
|
method: 'POST',
|
||||||
headers: {
|
headers: {
|
||||||
'Content-Type': 'application/json',
|
'Content-Type': 'application/json',
|
||||||
|
|||||||
Reference in New Issue
Block a user