feat: add headless gRPC server for external agent integration (#278)

* gRPC Server

* gRPC fix

* UpdProto

* fix: address PR review feedback for gRPC server

- Update bun.lock for new dependencies (frozen-lockfile CI fix)
- Add multi-turn session persistence via initialMessages
- Replace hardcoded done payload with real token counts
- Default bind to localhost instead of 0.0.0.0

* fix(grpc): startup parity, cancel interrupt, and cli text fallback

- Replace enableConfigs() with await init() in start-grpc.ts for full
  bootstrap parity with the main CLI (env vars, CA certs, mTLS, proxy,
  OAuth, Windows shell)
- Call engine.interrupt() before call.end() in the cancel handler so
  in-flight model/tool execution is actually stopped
- Show done.full_text in the CLI client when no text_chunk was received,
  preventing silent drops when streaming is unavailable

* fix(grpc): wire session_id end-to-end and remove dead provider field

- Move session_id from ClientMessage into ChatRequest to fix proto-loader
  oneofs encoding bug and make the field functional
- Implement in-memory session store so reconnecting with the same
  session_id resumes conversation context across streams
- Remove ChatRequest.provider — per-request provider routing requires
  global process.env mutation, unsafe for concurrent clients; provider
  is configured via env vars at server startup

* fix(grpc): mirror CLI auth bootstrap in start-grpc and fix tool_name field

scripts/start-grpc.ts now runs the same provider/auth bootstrap as the
normal CLI entrypoint: enableConfigs, safe env vars, Gemini/GitHub token
hydration, saved-profile resolution with warn-and-fallback, and provider
validation before the server binds.

ToolCallResult.tool_name was being populated with the tool_use_id UUID.
Added a toolNameById map (filled in canUseTool) so tool_name now carries
the actual tool name (e.g. "Bash"). The UUID moves to a new tool_use_id
field (proto field 4) for client-side correlation.

* fix(grpc): add tool_use_id to ToolCallStart and interrupt engine on stream close

Two blocker-level issues flagged in code review:

- ToolCallStart was missing tool_use_id, making it impossible for clients
  to correlate tool_start events with tool_result when the same tool runs
  multiple times. Added tool_use_id = 3 to the proto message and populated
  it from the toolUseID parameter in canUseTool.

- On stream close without an explicit CancelSignal the server only nulled
  the engine reference, leaving the underlying model/tool work running
  as an orphan. Added engine.interrupt() in the call.on('end') handler
  to stop work immediately when the client disconnects.

* fix(grpc): resolve pending promises on disconnect and guard post-cancel writes

Four lifecycle and contract issues identified during proactive review:

- Pending permission Promises in canUseTool would hang forever if the
  client disconnected mid-stream. On call 'end', all pending resolvers
  are now called with 'no' so the engine can unblock and terminate.

- The done message and session save could fire after call.end() when
  a CancelSignal arrived mid-generation. Added an `interrupted` flag
  set on both cancel and stream close to gate all post-loop writes.

- The session map had no eviction policy, allowing unbounded memory
  growth. Capped at MAX_SESSIONS=1000 with FIFO eviction of the
  oldest entry.

- Field 3 was silently absent from ChatRequest. Added `reserved 3`
  to document the gap and prevent accidental reuse in future.

* fix(grpc): reset previousMessages on each new request to prevent session history leak

previousMessages was declared at stream scope and only overwritten when
the incoming session_id already existed in the session store. A second
request on the same stream with a new session_id would silently inherit
the first request's conversation history in initialMessages instead of
starting fresh, violating the session contract.

Fix: reset previousMessages to [] at the start of each ChatRequest
before the session-store lookup.

* fix(grpc): reset interrupted flag between requests and guard against concurrent ChatRequest

Two stream-scoped state bugs found during proactive audit:

- The `interrupted` flag was never reset between requests on the same
  stream. If the first request was cancelled, all subsequent requests
  would silently skip the done message, causing the client to hang.

- A second ChatRequest arriving while the first was still processing
  would overwrite the engine reference, corrupting the lifecycle of
  both requests. Now returns ALREADY_EXISTS error instead. Engine is
  nulled after the for-await loop completes so subsequent requests
  can proceed normally.

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
NikitaBabenko
2026-04-06 12:54:10 +03:00
committed by GitHub
parent 112df59117
commit 26eef92fe7
8 changed files with 659 additions and 27 deletions

252
src/grpc/server.ts Normal file
View File

@@ -0,0 +1,252 @@
import * as grpc from '@grpc/grpc-js'
import * as protoLoader from '@grpc/proto-loader'
import path from 'path'
import { randomUUID } from 'crypto'
import { QueryEngine } from '../QueryEngine.js'
import { getTools } from '../tools.js'
import { getDefaultAppState } from '../state/AppStateStore.js'
import { AppState } from '../state/AppState.js'
import { FileStateCache, READ_FILE_STATE_CACHE_SIZE } from '../utils/fileStateCache.js'
const PROTO_PATH = path.resolve(import.meta.dirname, '../proto/openclaude.proto')
const packageDefinition = protoLoader.loadSync(PROTO_PATH, {
keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true,
})
const protoDescriptor = grpc.loadPackageDefinition(packageDefinition) as any
const openclaudeProto = protoDescriptor.openclaude.v1
const MAX_SESSIONS = 1000
export class GrpcServer {
private server: grpc.Server
private sessions: Map<string, any[]> = new Map()
constructor() {
this.server = new grpc.Server()
this.server.addService(openclaudeProto.AgentService.service, {
Chat: this.handleChat.bind(this),
})
}
start(port: number = 50051, host: string = 'localhost') {
this.server.bindAsync(
`${host}:${port}`,
grpc.ServerCredentials.createInsecure(),
(error, boundPort) => {
if (error) {
console.error('Failed to start gRPC server', error)
return
}
console.log(`gRPC Server running at ${host}:${boundPort}`)
}
)
}
private handleChat(call: grpc.ServerDuplexStream<any, any>) {
let engine: QueryEngine | null = null
let appState: AppState = getDefaultAppState()
const fileCache: FileStateCache = new FileStateCache(READ_FILE_STATE_CACHE_SIZE, 25 * 1024 * 1024)
// To handle ActionRequired (ask user for permission)
const pendingRequests = new Map<string, (reply: string) => void>()
// Accumulated messages from previous turns for multi-turn context
let previousMessages: any[] = []
let sessionId = ''
let interrupted = false
call.on('data', async (clientMessage) => {
try {
if (clientMessage.request) {
if (engine) {
call.write({
error: {
message: 'A request is already in progress on this stream',
code: 'ALREADY_EXISTS'
}
})
return
}
interrupted = false
const req = clientMessage.request
sessionId = req.session_id || ''
previousMessages = []
// Load previous messages from session store (cross-stream persistence)
if (sessionId && this.sessions.has(sessionId)) {
previousMessages = [...this.sessions.get(sessionId)!]
}
const toolNameById = new Map<string, string>()
engine = new QueryEngine({
cwd: req.working_directory || process.cwd(),
tools: getTools(appState.toolPermissionContext), // Gets all available tools
commands: [], // Slash commands
mcpClients: [],
agents: [],
...(previousMessages.length > 0 ? { initialMessages: previousMessages } : {}),
includePartialMessages: true,
canUseTool: async (tool, input, context, assistantMsg, toolUseID) => {
if (toolUseID) {
toolNameById.set(toolUseID, tool.name)
}
// Notify client of the tool call first
call.write({
tool_start: {
tool_name: tool.name,
arguments_json: JSON.stringify(input),
tool_use_id: toolUseID
}
})
// Ask user for permission
const promptId = randomUUID()
const question = `Approve ${tool.name}?`
call.write({
action_required: {
prompt_id: promptId,
question,
type: 'CONFIRM_COMMAND'
}
})
return new Promise((resolve) => {
pendingRequests.set(promptId, (reply) => {
if (reply.toLowerCase() === 'yes' || reply.toLowerCase() === 'y') {
resolve({ behavior: 'allow' })
} else {
resolve({ behavior: 'deny', reason: 'User denied via gRPC' })
}
})
})
},
getAppState: () => appState,
setAppState: (updater) => { appState = updater(appState) },
readFileCache: fileCache,
userSpecifiedModel: req.model,
fallbackModel: req.model,
})
// Track accumulated response data for FinalResponse
let fullText = ''
let promptTokens = 0
let completionTokens = 0
const generator = engine.submitMessage(req.message)
for await (const msg of generator) {
if (msg.type === 'stream_event') {
if (msg.event.type === 'content_block_delta' && msg.event.delta.type === 'text_delta') {
call.write({
text_chunk: {
text: msg.event.delta.text
}
})
fullText += msg.event.delta.text
}
} else if (msg.type === 'user') {
// Extract tool results
const content = msg.message.content
if (Array.isArray(content)) {
for (const block of content) {
if (block.type === 'tool_result') {
let outputStr = ''
if (typeof block.content === 'string') {
outputStr = block.content
} else if (Array.isArray(block.content)) {
outputStr = block.content.map(c => c.type === 'text' ? c.text : '').join('\n')
}
call.write({
tool_result: {
tool_name: toolNameById.get(block.tool_use_id) ?? block.tool_use_id,
tool_use_id: block.tool_use_id,
output: outputStr,
is_error: block.is_error || false
}
})
}
}
}
} else if (msg.type === 'result') {
// Extract real token counts and final text from the result
if (msg.subtype === 'success') {
if (msg.result) {
fullText = msg.result
}
promptTokens = msg.usage?.input_tokens ?? 0
completionTokens = msg.usage?.output_tokens ?? 0
}
}
}
if (!interrupted) {
// Save messages for multi-turn context in subsequent requests
previousMessages = [...engine.getMessages()]
// Persist to session store for cross-stream resumption
if (sessionId) {
if (!this.sessions.has(sessionId) && this.sessions.size >= MAX_SESSIONS) {
// Evict oldest session (Map preserves insertion order)
this.sessions.delete(this.sessions.keys().next().value)
}
this.sessions.set(sessionId, previousMessages)
}
call.write({
done: {
full_text: fullText,
prompt_tokens: promptTokens,
completion_tokens: completionTokens
}
})
}
engine = null
} else if (clientMessage.input) {
const promptId = clientMessage.input.prompt_id
const reply = clientMessage.input.reply
if (pendingRequests.has(promptId)) {
pendingRequests.get(promptId)!(reply)
pendingRequests.delete(promptId)
}
} else if (clientMessage.cancel) {
interrupted = true
if (engine) {
engine.interrupt()
}
call.end()
}
} catch (err: any) {
console.error("Error processing stream:", err)
call.write({
error: {
message: err.message || "Internal server error",
code: "INTERNAL"
}
})
call.end()
}
})
call.on('end', () => {
interrupted = true
// Unblock any pending permission prompts so canUseTool can return
for (const resolve of pendingRequests.values()) {
resolve('no')
}
if (engine) {
engine.interrupt()
}
engine = null
pendingRequests.clear()
})
}
}

101
src/proto/openclaude.proto Normal file
View File

@@ -0,0 +1,101 @@
syntax = "proto3";
package openclaude.v1;
// Main Agent Service
service AgentService {
// Bidirectional stream: client sends tasks and answers to agent prompts,
// server streams text tokens, tool states, and requests permissions.
rpc Chat(stream ClientMessage) returns (stream ServerMessage);
}
// ---------------------------------------------------------
// MESSAGES FROM CLIENT (Input)
// ---------------------------------------------------------
message ClientMessage {
oneof payload {
// 1. Initial request (first message in the stream)
ChatRequest request = 2;
// 2. User response to an agent prompt (e.g., command confirmation)
UserInput input = 3;
// 3. Interrupt signal (if the user clicks "Stop generation")
CancelSignal cancel = 4;
}
}
message ChatRequest {
string message = 1;
string working_directory = 2; // Where the agent should execute commands
reserved 3; // Reserved to prevent accidental reuse
optional string model = 4;
string session_id = 5; // Non-empty = cross-stream session persistence
}
message UserInput {
string reply = 1; // Text response (e.g., "y", "no", or clarification)
string prompt_id = 2; // ID of the prompt we are responding to
}
message CancelSignal {
string reason = 1;
}
// ---------------------------------------------------------
// MESSAGES FROM SERVER (Output / Events)
// ---------------------------------------------------------
message ServerMessage {
// Using oneof guarantees that only one type of event arrives at a time
oneof event {
TextChunk text_chunk = 1; // Chunk of text from LLM
ToolCallStart tool_start = 2; // Agent started using a tool
ToolCallResult tool_result = 3; // Tool returned a result
ActionRequired action_required = 4;// Agent requires human intervention
FinalResponse done = 5; // Generation successfully completed
ErrorResponse error = 6; // A critical error occurred
}
}
// Stream text chunk
message TextChunk {
string text = 1;
}
// Agent decided to use a tool (bash, read_file, etc.)
message ToolCallStart {
string tool_name = 1;
string arguments_json = 2; // Arguments in JSON format
string tool_use_id = 3; // Correlation ID matching ToolCallResult
}
// Result of tool execution
message ToolCallResult {
string tool_name = 1;
string output = 2; // stdout/stderr or file contents
bool is_error = 3; // Did the command itself fail
string tool_use_id = 4; // Correlation ID matching ToolCallStart
}
// Agent paused work and is waiting for user decision
message ActionRequired {
string prompt_id = 1; // Client must return this ID in UserInput
string question = 2; // Question text (e.g., "Execute 'rm -rf /'?")
enum ActionType {
CONFIRM_COMMAND = 0; // Yes/No
REQUEST_INFORMATION = 1; // Text input
}
ActionType type = 3;
}
// Final statistics
message FinalResponse {
string full_text = 1; // The entire generated text
int32 prompt_tokens = 2;
int32 completion_tokens = 3;
}
message ErrorResponse {
string message = 1;
string code = 2;
}