""" Claude Code Agent Runtime ========================= Wraps the Claude Code CLI as an alternative AI brain for Aetheel. Like OpenCodeRuntime, this provides a subprocess-based interface to an AI coding agent. Claude Code uses the `claude` CLI from Anthropic. Usage: from agent.claude_runtime import ClaudeCodeRuntime, ClaudeCodeConfig runtime = ClaudeCodeRuntime() response = runtime.chat("What is Python?", conversation_id="slack-thread-123") """ import json import logging import os import shutil import subprocess import time from dataclasses import dataclass, field # Re-use AgentResponse and SessionStore from opencode_runtime from agent.opencode_runtime import AgentResponse, SessionStore logger = logging.getLogger("aetheel.agent.claude") # --------------------------------------------------------------------------- # Rate Limit Detection # --------------------------------------------------------------------------- _RATE_LIMIT_PATTERNS = [ "rate limit", "rate_limit", "too many requests", "429", "quota exceeded", "usage limit", "capacity", "overloaded", "credit balance", "billing", "exceeded your", "max usage", ] def _is_rate_limited(text: str) -> bool: """Check if an error message indicates a rate limit or quota issue.""" lower = text.lower() return any(pattern in lower for pattern in _RATE_LIMIT_PATTERNS) # --------------------------------------------------------------------------- # CLI Resolution # --------------------------------------------------------------------------- def _resolve_claude_command(explicit: str | None = None) -> str: """ Resolve the claude binary path. Checks common install locations since subprocesses may not have the same PATH as the user's shell. """ if explicit: resolved = shutil.which(explicit) if resolved: return resolved return explicit # Let subprocess fail with a clear error # 1. Try PATH first found = shutil.which("claude") if found: return found # 2. Common install locations home = os.path.expanduser("~") candidates = [ os.path.join(home, ".claude", "bin", "claude"), os.path.join(home, ".local", "bin", "claude"), "/usr/local/bin/claude", os.path.join(home, ".npm-global", "bin", "claude"), ] for path in candidates: if os.path.isfile(path) and os.access(path, os.X_OK): return path return "claude" # Fallback — will error at runtime if not found # --------------------------------------------------------------------------- # Configuration # --------------------------------------------------------------------------- @dataclass class ClaudeCodeConfig: """Configuration for the Claude Code runtime.""" command: str = "" model: str | None = None # e.g., "claude-sonnet-4-20250514" timeout_seconds: int = 120 max_turns: int = 3 # Limit tool use turns for faster responses workspace_dir: str | None = None system_prompt: str | None = None session_ttl_hours: int = 24 # claude -p flags output_format: str = "json" # "json", "text", or "stream-json" # Permission settings allowed_tools: list[str] = field(default_factory=lambda: [ "Bash", "Read", "Write", "Edit", "Glob", "Grep", "WebSearch", "WebFetch", "Task", "TaskOutput", "TaskStop", "Skill", "TeamCreate", "TeamDelete", "SendMessage", ]) # Whether to disable all tool use (pure conversation mode) no_tools: bool = False # Default: tools enabled @classmethod def from_env(cls) -> "ClaudeCodeConfig": """Create config from environment variables.""" return cls( command=os.environ.get("CLAUDE_COMMAND", ""), model=os.environ.get("CLAUDE_MODEL"), timeout_seconds=int(os.environ.get("CLAUDE_TIMEOUT", "120")), max_turns=int(os.environ.get("CLAUDE_MAX_TURNS", "3")), workspace_dir=os.environ.get("CLAUDE_WORKSPACE"), system_prompt=os.environ.get("CLAUDE_SYSTEM_PROMPT"), no_tools=os.environ.get("CLAUDE_NO_TOOLS", "false").lower() == "true", ) # --------------------------------------------------------------------------- # Claude Code Runtime # --------------------------------------------------------------------------- class ClaudeCodeRuntime: """ Claude Code Agent Runtime — alternative AI brain for Aetheel. Uses the `claude` CLI in non-interactive mode (`claude -p`). Supports: - Non-interactive execution with `-p` flag - JSON output parsing with `--output-format json` - Session continuity with `--continue` and `--session-id` - System prompt injection with `--system-prompt` - Model selection with `--model` - Tool restriction with `--allowedTools` or `--disallowedTools` """ def __init__(self, config: ClaudeCodeConfig | None = None): self._config = config or ClaudeCodeConfig() self._config.command = _resolve_claude_command(self._config.command) self._sessions = SessionStore() # Validate on init self._validate_installation() logger.info( f"ClaudeCodeRuntime initialized: " f"command={self._config.command}, " f"model={self._config.model or 'default'}" ) def chat( self, message: str, conversation_id: str | None = None, system_prompt: str | None = None, ) -> AgentResponse: """ Send a message to Claude Code and get a response. This is the main entry point, matching OpenCodeRuntime.chat(). """ start = time.monotonic() try: result = self._run_claude(message, conversation_id, system_prompt) result.duration_ms = int((time.monotonic() - start) * 1000) return result except Exception as e: logger.error(f"Claude runtime error: {e}", exc_info=True) return AgentResponse( text="", error=str(e), duration_ms=int((time.monotonic() - start) * 1000), ) def get_status(self) -> dict: """Get the runtime status (for the /status command).""" return { "mode": "claude-code", "model": self._config.model or "default", "provider": "anthropic", "active_sessions": self._sessions.count, "claude_available": self._is_claude_available(), } def cleanup_sessions(self) -> int: """Clean up stale sessions. Returns count removed.""" return self._sessions.cleanup(self._config.session_ttl_hours) # ------------------------------------------------------------------- # Core: Run claude CLI # ------------------------------------------------------------------- def _run_claude( self, message: str, conversation_id: str | None = None, system_prompt: str | None = None, ) -> AgentResponse: """ Run `claude -p ` and parse the response. Claude Code's `-p` flag runs in non-interactive (print) mode: - Processes the message - Returns the response - Exits immediately """ args = self._build_cli_args(message, conversation_id, system_prompt) logger.info( f"Claude exec: claude -p " f"(prompt_chars={len(message)}, " f"session={conversation_id or 'new'})" ) try: result = subprocess.run( args, capture_output=True, text=True, timeout=self._config.timeout_seconds, cwd=self._config.workspace_dir or os.getcwd(), env=self._build_cli_env(), ) stdout = result.stdout.strip() stderr = result.stderr.strip() if result.returncode != 0: error_text = stderr or stdout or "Claude CLI command failed" logger.error( f"Claude failed (code={result.returncode}): {error_text[:200]}" ) return AgentResponse( text="", error=f"Claude Code error: {error_text[:500]}", rate_limited=_is_rate_limited(error_text), ) # Parse the output response_text, session_id, usage = self._parse_output(stdout) if not response_text and stdout.strip(): # Only fall back to raw output if it doesn't look like JSON events # (which would leak internal lifecycle data to the user) if not stdout.strip().startswith("{"): response_text = stdout # Store session mapping if session_id and conversation_id: self._sessions.set(conversation_id, session_id) # Detect rate limiting from error text rate_limited = False if not response_text and stderr: rate_limited = _is_rate_limited(stderr) if usage and usage.get("is_error"): rate_limited = rate_limited or _is_rate_limited( usage.get("error_text", "") ) return AgentResponse( text=response_text, session_id=session_id, model=self._config.model, provider="anthropic", usage=usage, rate_limited=rate_limited, ) except subprocess.TimeoutExpired: logger.error( f"Claude timeout after {self._config.timeout_seconds}s" ) return AgentResponse( text="", error=f"Request timed out after {self._config.timeout_seconds}s", ) def _build_cli_args( self, message: str, conversation_id: str | None = None, system_prompt: str | None = None, ) -> list[str]: """ Build CLI arguments for `claude -p`. Claude Code CLI flags: -p, --print Non-interactive mode (print and exit) --output-format json | text | stream-json --model Model to use --system-prompt System prompt (claude supports this natively!) --continue Continue the most recent session --session-id Resume a specific session --max-turns Limit agentic turns --allowedTools Restrict tool access --disallowedTools Block specific tools """ args = [self._config.command, "-p"] # Model selection if self._config.model: args.extend(["--model", self._config.model]) # Output format args.extend(["--output-format", self._config.output_format]) # System prompt — claude supports this natively (unlike opencode) prompt = system_prompt or self._config.system_prompt if prompt: args.extend(["--system-prompt", prompt]) # Session continuity existing_session = None if conversation_id: existing_session = self._sessions.get(conversation_id) if existing_session: args.extend(["--session-id", existing_session, "--continue"]) # Max turns for tool use if self._config.max_turns: args.extend(["--max-turns", str(self._config.max_turns)]) # Tool restrictions if self._config.no_tools: # Disable all tools for pure conversation args.extend(["--allowedTools", ""]) elif self._config.allowed_tools: for tool in self._config.allowed_tools: args.extend(["--allowedTools", tool]) # The message (positional arg, must come last) args.append(message) return args def _build_cli_env(self) -> dict[str, str]: """Build environment variables for the CLI subprocess.""" env = os.environ.copy() return env def _parse_output(self, stdout: str) -> tuple[str, str | None, dict | None]: """ Parse claude CLI output. With --output-format json, claude returns a JSON object: { "type": "result", "subtype": "success", "cost_usd": 0.003, "is_error": false, "duration_ms": 1234, "duration_api_ms": 1100, "num_turns": 1, "result": "The response text...", "session_id": "abc123-..." } With --output-format text, it returns plain text. """ if not stdout.strip(): return "", None, None # Try JSON format first try: data = json.loads(stdout) if isinstance(data, dict): # Standard JSON response text = data.get("result", "") session_id = data.get("session_id") # Extract usage stats usage = { "cost_usd": data.get("cost_usd", 0), "num_turns": data.get("num_turns", 0), "duration_ms": data.get("duration_ms", 0), "duration_api_ms": data.get("duration_api_ms", 0), "is_error": data.get("is_error", False), "subtype": data.get("subtype", ""), } if data.get("is_error"): error_msg = text or data.get("error", "Unknown error") usage["error_text"] = error_msg logger.warning(f"Claude returned error: {error_msg[:200]}") return f"⚠️ {error_msg}", session_id, usage return text, session_id, usage except json.JSONDecodeError: pass # Try JSONL (stream-json) format text_parts = [] session_id = None usage = None for line in stdout.splitlines(): line = line.strip() if not line: continue try: event = json.loads(line) if isinstance(event, dict): event_type = event.get("type", "") if event_type == "result": text_parts.append(event.get("result", "")) session_id = event.get("session_id", session_id) usage = { "cost_usd": event.get("cost_usd", 0), "num_turns": event.get("num_turns", 0), "duration_ms": event.get("duration_ms", 0), "duration_api_ms": event.get("duration_api_ms", 0), "is_error": event.get("is_error", False), } elif event_type == "assistant" and "message" in event: # Extract text from content blocks msg = event["message"] if "content" in msg: for block in msg["content"]: if block.get("type") == "text": text_parts.append(block.get("text", "")) session_id = event.get("session_id", session_id) # Silently skip non-content events (step_start, step_finish, # system, tool_use, tool_result, etc.) — these are internal # lifecycle events that should never reach the user. except json.JSONDecodeError: # Not JSON — could be plain text mixed in; only include if # it doesn't look like a truncated JSON blob. if not line.startswith("{") and not line.startswith("["): text_parts.append(line) continue if text_parts: return "\n".join(text_parts), session_id, usage # Fallback: treat as plain text, but strip any JSON-like lines # to prevent raw event objects from leaking to the user. plain_lines = [] for line in stdout.splitlines(): stripped = line.strip() if stripped and not stripped.startswith("{"): plain_lines.append(line) if plain_lines: return "\n".join(plain_lines), None, None # Everything was JSON events with no extractable text logger.warning("Claude output contained only non-content JSON events") return "", None, None # ------------------------------------------------------------------- # Validation # ------------------------------------------------------------------- def _validate_installation(self) -> None: """Check that Claude Code CLI is installed and accessible.""" cmd = self._config.command if not cmd: logger.warning("Claude Code command is empty") return resolved = shutil.which(cmd) if resolved: logger.info(f"Claude Code found: {resolved}") else: logger.warning( f"Claude Code CLI not found at '{cmd}'. " "Install with: npm install -g @anthropic-ai/claude-code" ) def _is_claude_available(self) -> bool: """Check if Claude Code CLI is available.""" try: result = subprocess.run( [self._config.command, "--version"], capture_output=True, text=True, timeout=5, ) return result.returncode == 0 except Exception: return False