Files
orcs-code/atomic_chat_provider.py
Misha Skvortsov 577e654ae7 feat: add support for Atomic Chat provider
- Introduced a new provider profile for Atomic Chat, allowing it to be used alongside existing providers.
- Updated `package.json` to include a new development script for launching Atomic Chat.
- Modified `smart_router.py` to recognize Atomic Chat as a local provider that does not require an API key.
- Enhanced provider discovery and launch scripts to handle Atomic Chat, including model listing and connection checks.
- Added tests to ensure proper environment setup and behavior for Atomic Chat profiles.

This update expands the functionality of the application to support local LLMs via Atomic Chat, improving versatility for users.
2026-04-02 10:37:54 +03:00

147 lines
5.2 KiB
Python

"""
atomic_chat_provider.py
-----------------------
Adds native Atomic Chat support to openclaude.
Lets Claude Code route requests to any locally-running model via
Atomic Chat (Apple Silicon only) at 127.0.0.1:1337.
Atomic Chat exposes an OpenAI-compatible API, so messages are forwarded
directly without translation.
Usage (.env):
PREFERRED_PROVIDER=atomic-chat
ATOMIC_CHAT_BASE_URL=http://127.0.0.1:1337
"""
import httpx
import json
import logging
import os
from typing import AsyncIterator
logger = logging.getLogger(__name__)
ATOMIC_CHAT_BASE_URL = os.getenv("ATOMIC_CHAT_BASE_URL", "http://127.0.0.1:1337")
def _api_url(path: str) -> str:
return f"{ATOMIC_CHAT_BASE_URL}/v1{path}"
async def check_atomic_chat_running() -> bool:
try:
async with httpx.AsyncClient(timeout=3.0) as client:
resp = await client.get(_api_url("/models"))
return resp.status_code == 200
except Exception:
return False
async def list_atomic_chat_models() -> list[str]:
try:
async with httpx.AsyncClient(timeout=5.0) as client:
resp = await client.get(_api_url("/models"))
resp.raise_for_status()
data = resp.json()
return [m["id"] for m in data.get("data", [])]
except Exception as e:
logger.warning(f"Could not list Atomic Chat models: {e}")
return []
async def atomic_chat(
model: str,
messages: list[dict],
system: str | None = None,
max_tokens: int = 4096,
temperature: float = 1.0,
) -> dict:
chat_messages = list(messages)
if system:
chat_messages.insert(0, {"role": "system", "content": system})
payload = {
"model": model,
"messages": chat_messages,
"max_tokens": max_tokens,
"temperature": temperature,
"stream": False,
}
async with httpx.AsyncClient(timeout=120.0) as client:
resp = await client.post(_api_url("/chat/completions"), json=payload)
resp.raise_for_status()
data = resp.json()
choice = data.get("choices", [{}])[0]
assistant_text = choice.get("message", {}).get("content", "")
usage = data.get("usage", {})
return {
"id": data.get("id", "msg_atomic_chat"),
"type": "message",
"role": "assistant",
"content": [{"type": "text", "text": assistant_text}],
"model": model,
"stop_reason": "end_turn",
"stop_sequence": None,
"usage": {
"input_tokens": usage.get("prompt_tokens", 0),
"output_tokens": usage.get("completion_tokens", 0),
},
}
async def atomic_chat_stream(
model: str,
messages: list[dict],
system: str | None = None,
max_tokens: int = 4096,
temperature: float = 1.0,
) -> AsyncIterator[str]:
chat_messages = list(messages)
if system:
chat_messages.insert(0, {"role": "system", "content": system})
payload = {
"model": model,
"messages": chat_messages,
"max_tokens": max_tokens,
"temperature": temperature,
"stream": True,
}
yield "event: message_start\n"
yield f'data: {json.dumps({"type": "message_start", "message": {"id": "msg_atomic_chat_stream", "type": "message", "role": "assistant", "content": [], "model": model, "stop_reason": None, "usage": {"input_tokens": 0, "output_tokens": 0}}})}\n\n'
yield "event: content_block_start\n"
yield f'data: {json.dumps({"type": "content_block_start", "index": 0, "content_block": {"type": "text", "text": ""}})}\n\n'
async with httpx.AsyncClient(timeout=120.0) as client:
async with client.stream("POST", _api_url("/chat/completions"), json=payload) as resp:
resp.raise_for_status()
async for line in resp.aiter_lines():
if not line or not line.startswith("data: "):
continue
raw = line[len("data: "):]
if raw.strip() == "[DONE]":
break
try:
chunk = json.loads(raw)
delta = chunk.get("choices", [{}])[0].get("delta", {})
delta_text = delta.get("content", "")
if delta_text:
yield "event: content_block_delta\n"
yield f'data: {json.dumps({"type": "content_block_delta", "index": 0, "delta": {"type": "text_delta", "text": delta_text}})}\n\n'
finish_reason = chunk.get("choices", [{}])[0].get("finish_reason")
if finish_reason:
usage = chunk.get("usage", {})
yield "event: content_block_stop\n"
yield f'data: {json.dumps({"type": "content_block_stop", "index": 0})}\n\n'
yield "event: message_delta\n"
yield f'data: {json.dumps({"type": "message_delta", "delta": {"stop_reason": "end_turn", "stop_sequence": None}, "usage": {"output_tokens": usage.get("completion_tokens", 0)}})}\n\n'
yield "event: message_stop\n"
yield f'data: {json.dumps({"type": "message_stop"})}\n\n'
break
except json.JSONDecodeError:
continue