""" OpenCode Agent Runtime ====================== Wraps OpenCode CLI as the AI brain for Aetheel, inspired by OpenClaw's cli-runner.ts. Two modes of operation: 1. **SDK Mode** (preferred) — Connects to a running `opencode serve` instance via the official Python SDK (`opencode-ai`). Persistent sessions, low latency. 2. **CLI Mode** (fallback) — Spawns `opencode run` as a subprocess for each request. No persistent server needed, but higher per-request latency. Architecture (modeled after OpenClaw): - OpenClaw's `cli-runner.ts` runs CLI agents as subprocesses with configurable backends (claude-cli, codex-cli, etc.) via `runCommandWithTimeout()`. - OpenClaw's `cli-backends.ts` defines backend configs with command, args, output format, model aliases, session handling, etc. - We replicate this pattern for OpenCode, but leverage OpenCode's `serve` mode and its Python SDK for a cleaner integration. Session Management: - Each Slack thread maps to an OpenCode session (via `conversation_id`). - Sessions are created on first message and reused for follow-ups. - This mirrors OpenClaw's session isolation strategy. Usage: from agent.opencode_runtime import OpenCodeRuntime runtime = OpenCodeRuntime(mode="sdk") response = runtime.chat("What is Python?", session_id="slack-thread-123") """ import json import logging import os import queue import shutil import sqlite3 import subprocess import threading import time from dataclasses import dataclass, field from enum import Enum from pathlib import Path from typing import Any, Callable logger = logging.getLogger("aetheel.agent") # --------------------------------------------------------------------------- # Rate Limit Detection # --------------------------------------------------------------------------- _RATE_LIMIT_PATTERNS = [ "rate limit", "rate_limit", "too many requests", "429", "quota exceeded", "usage limit", "capacity", "overloaded", "credit balance", "billing", "exceeded your", "max usage", ] def _is_rate_limited(text: str) -> bool: """Check if an error message indicates a rate limit or quota issue.""" lower = text.lower() return any(pattern in lower for pattern in _RATE_LIMIT_PATTERNS) def _resolve_opencode_command(explicit: str | None = None) -> str: """ Resolve the opencode binary path. Python subprocesses don't source ~/.zshrc or ~/.bashrc, so paths like ~/.opencode/bin won't be in PATH. This function checks common install locations to find the binary automatically. Priority: 1. Explicit path (from OPENCODE_COMMAND env var) 2. shutil.which (already in system PATH) 3. ~/.opencode/bin/opencode (official installer default) 4. ~/.local/bin/opencode (common Linux/macOS location) 5. npm global installs (npx-style locations) """ cmd = explicit or "opencode" # If explicit path is absolute and exists, use it directly if os.path.isabs(cmd) and os.path.isfile(cmd): return cmd # Try system PATH first found = shutil.which(cmd) if found: return found # Check common install locations home = Path.home() candidates = [ home / ".opencode" / "bin" / "opencode", # official installer home / ".local" / "bin" / "opencode", # common Linux/macOS Path("/usr/local/bin/opencode"), # Homebrew / manual Path("/opt/homebrew/bin/opencode"), # Homebrew (Apple Silicon) ] for candidate in candidates: if candidate.is_file() and os.access(candidate, os.X_OK): logger.info(f"Auto-discovered opencode at: {candidate}") return str(candidate) # Return the original command (will fail at runtime with a clear error) return cmd # --------------------------------------------------------------------------- # Configuration # --------------------------------------------------------------------------- class RuntimeMode(Enum): """How the runtime connects to OpenCode.""" SDK = "sdk" # via opencode serve + Python SDK CLI = "cli" # via opencode run subprocess @dataclass class OpenCodeConfig: """ Configuration for the OpenCode runtime. Modeled after OpenClaw's CliBackendConfig in cli-backends.ts. """ # Connection mode: RuntimeMode = RuntimeMode.CLI server_url: str = "http://localhost:4096" server_password: str | None = None server_username: str = "opencode" # CLI settings (for CLI mode, mirroring OpenClaw's DEFAULT_CLAUDE_BACKEND) command: str = "opencode" timeout_seconds: int = 120 # Model model: str | None = None # e.g., "anthropic/claude-sonnet-4-20250514" provider: str | None = None # e.g., "anthropic" # Agent behavior system_prompt: str | None = None workspace_dir: str | None = None format: str = "json" # output format: "default" (formatted) or "json" (raw events) agent: str | None = None # OpenCode agent name (from `opencode agent list`) attach_url: str | None = None # Attach to running server to avoid MCP cold boot # Session auto_create_sessions: bool = True session_ttl_hours: int = 24 @classmethod def from_env(cls) -> "OpenCodeConfig": """Create config from environment variables.""" mode_str = os.environ.get("OPENCODE_MODE", "cli").lower() mode = RuntimeMode.SDK if mode_str == "sdk" else RuntimeMode.CLI return cls( mode=mode, server_url=os.environ.get( "OPENCODE_SERVER_URL", "http://localhost:4096" ), server_password=os.environ.get("OPENCODE_SERVER_PASSWORD"), server_username=os.environ.get("OPENCODE_SERVER_USERNAME", "opencode"), command=_resolve_opencode_command( os.environ.get("OPENCODE_COMMAND") ), timeout_seconds=int(os.environ.get("OPENCODE_TIMEOUT", "120")), model=os.environ.get("OPENCODE_MODEL"), provider=os.environ.get("OPENCODE_PROVIDER"), system_prompt=os.environ.get("OPENCODE_SYSTEM_PROMPT"), workspace_dir=os.environ.get( "OPENCODE_WORKSPACE", os.environ.get("AETHEEL_WORKSPACE"), ), format=os.environ.get("OPENCODE_FORMAT", "json"), agent=os.environ.get("OPENCODE_AGENT"), attach_url=os.environ.get("OPENCODE_ATTACH"), ) # --------------------------------------------------------------------------- # Agent Response # --------------------------------------------------------------------------- @dataclass class AgentResponse: """Response from the agent runtime.""" text: str session_id: str | None = None model: str | None = None provider: str | None = None duration_ms: int = 0 usage: dict | None = None error: str | None = None rate_limited: bool = False @property def ok(self) -> bool: return self.error is None and bool(self.text.strip()) # --------------------------------------------------------------------------- # Session Store # --------------------------------------------------------------------------- class SessionStore: """ Maps external IDs (e.g., Slack thread_ts) to OpenCode session IDs. Mirrors OpenClaw's session isolation: each channel thread gets its own session. Backed by SQLite for persistence across restarts. Falls back to in-memory if the database cannot be opened. """ def __init__(self, db_path: str | None = None): self._lock = threading.Lock() self._db_path = db_path or os.path.join( os.path.expanduser("~/.aetheel"), "sessions.db" ) os.makedirs(os.path.dirname(self._db_path), exist_ok=True) self._init_db() def _init_db(self) -> None: """Initialize the sessions table.""" with sqlite3.connect(self._db_path) as conn: conn.execute( """ CREATE TABLE IF NOT EXISTS sessions ( external_id TEXT PRIMARY KEY, session_id TEXT NOT NULL, source TEXT NOT NULL DEFAULT '', created_at REAL NOT NULL, last_used REAL NOT NULL ) """ ) conn.commit() logger.debug(f"Session store initialized: {self._db_path}") def _conn(self) -> sqlite3.Connection: conn = sqlite3.connect(self._db_path) conn.row_factory = sqlite3.Row return conn def get(self, external_id: str) -> str | None: """Get the OpenCode session ID for an external conversation ID.""" with self._lock: with self._conn() as conn: row = conn.execute( "SELECT session_id FROM sessions WHERE external_id = ?", (external_id,), ).fetchone() if row: conn.execute( "UPDATE sessions SET last_used = ? WHERE external_id = ?", (time.time(), external_id), ) conn.commit() return row["session_id"] return None def set(self, external_id: str, session_id: str, source: str = "") -> None: """Map an external ID to an OpenCode session ID.""" now = time.time() with self._lock: with self._conn() as conn: conn.execute( """ INSERT INTO sessions (external_id, session_id, source, created_at, last_used) VALUES (?, ?, ?, ?, ?) ON CONFLICT(external_id) DO UPDATE SET session_id = excluded.session_id, last_used = excluded.last_used """, (external_id, session_id, source, now, now), ) conn.commit() def remove(self, external_id: str) -> None: """Remove a session mapping.""" with self._lock: with self._conn() as conn: conn.execute( "DELETE FROM sessions WHERE external_id = ?", (external_id,), ) conn.commit() def cleanup(self, ttl_hours: int = 24) -> int: """Remove stale sessions older than ttl_hours. Returns count removed.""" cutoff = time.time() - (ttl_hours * 3600) with self._lock: with self._conn() as conn: cursor = conn.execute( "DELETE FROM sessions WHERE last_used < ?", (cutoff,), ) conn.commit() return cursor.rowcount def list_all(self) -> list[dict]: """List all active sessions (for diagnostics).""" with self._lock: with self._conn() as conn: rows = conn.execute( "SELECT external_id, session_id, source, created_at, last_used " "FROM sessions ORDER BY last_used DESC" ).fetchall() return [dict(row) for row in rows] @property def count(self) -> int: with self._lock: with self._conn() as conn: row = conn.execute("SELECT COUNT(*) as c FROM sessions").fetchone() return row["c"] if row else 0 # --------------------------------------------------------------------------- # Live Session — IPC Message Streaming # (Mirrors nanoclaw's MessageStream + IPC polling pattern) # --------------------------------------------------------------------------- @dataclass class LiveSession: """ A live, long-running agent session that accepts follow-up messages. In CLI mode: holds a running `opencode run` subprocess. Follow-up messages are queued and sent as new subprocess invocations that --continue the same session. In SDK mode: holds a session ID. Follow-up messages are sent via the SDK's session.prompt() to the same session. """ conversation_id: str session_id: str | None = None created_at: float = field(default_factory=time.time) last_activity: float = field(default_factory=time.time) message_count: int = 0 _lock: threading.Lock = field(default_factory=threading.Lock) def touch(self) -> None: """Update last activity timestamp.""" self.last_activity = time.time() @property def idle_seconds(self) -> float: return time.time() - self.last_activity class LiveSessionManager: """ Manages live sessions with idle timeout and cleanup. This is the IPC streaming layer — it keeps sessions alive between messages so follow-up messages go to the same agent context, mirroring nanoclaw's container-based session loop. """ def __init__(self, idle_timeout_seconds: int = 1800): self._sessions: dict[str, LiveSession] = {} self._lock = threading.Lock() self._idle_timeout = idle_timeout_seconds self._cleanup_thread: threading.Thread | None = None self._running = False def start(self) -> None: """Start the background cleanup thread.""" if self._running: return self._running = True self._cleanup_thread = threading.Thread( target=self._cleanup_loop, daemon=True, name="live-session-cleanup" ) self._cleanup_thread.start() def stop(self) -> None: """Stop the cleanup thread.""" self._running = False def get_or_create(self, conversation_id: str) -> LiveSession: """Get an existing live session or create a new one.""" with self._lock: session = self._sessions.get(conversation_id) if session: session.touch() return session session = LiveSession(conversation_id=conversation_id) self._sessions[conversation_id] = session logger.debug(f"Live session created: {conversation_id}") return session def get(self, conversation_id: str) -> LiveSession | None: """Get an existing live session (or None).""" with self._lock: return self._sessions.get(conversation_id) def remove(self, conversation_id: str) -> None: """Remove a live session.""" with self._lock: self._sessions.pop(conversation_id, None) def list_active(self) -> list[LiveSession]: """List all active live sessions.""" with self._lock: return list(self._sessions.values()) def _cleanup_loop(self) -> None: """Periodically remove idle sessions.""" while self._running: time.sleep(60) with self._lock: stale = [ cid for cid, s in self._sessions.items() if s.idle_seconds > self._idle_timeout ] for cid in stale: del self._sessions[cid] logger.info(f"Live session expired (idle): {cid}") # --------------------------------------------------------------------------- # OpenCode Runtime # --------------------------------------------------------------------------- class OpenCodeRuntime: """ OpenCode Agent Runtime — the AI brain for Aetheel. Inspired by OpenClaw's `runCliAgent()` in cli-runner.ts: - Resolves the CLI backend config - Builds CLI args (model, session, system prompt, etc.) - Runs the command with a timeout - Parses the JSON or text output - Returns structured results We adapt this for OpenCode's two modes: - SDK mode: uses the opencode-ai Python SDK to talk to `opencode serve` - CLI mode: spawns `opencode run` subprocess (like OpenClaw's approach) """ def __init__(self, config: OpenCodeConfig | None = None): self._config = config or OpenCodeConfig.from_env() self._sessions = SessionStore() self._live_sessions = LiveSessionManager( idle_timeout_seconds=self._config.session_ttl_hours * 3600 ) self._live_sessions.start() self._sdk_client = None self._sdk_available = False # Validate OpenCode is available self._validate_installation() # Try to initialize SDK client if in SDK mode if self._config.mode == RuntimeMode.SDK: self._init_sdk_client() logger.info( f"OpenCode runtime initialized " f"(mode={self._config.mode.value}, " f"model={self._config.model or 'default'})" ) # ------------------------------------------------------------------- # Public API # ------------------------------------------------------------------- def chat( self, message: str, conversation_id: str | None = None, system_prompt: str | None = None, files: list[str] | None = None, fork: bool = False, title: str | None = None, ) -> AgentResponse: """ Send a message to the AI agent and get a response. Args: message: The user's message text conversation_id: External conversation ID for session isolation system_prompt: Optional per-request system prompt override files: Optional file paths to attach (images, docs, etc.) fork: Fork the session instead of continuing linearly title: Human-readable session title """ started = time.time() if not message.strip(): return AgentResponse( text="", error="Empty message", duration_ms=0 ) try: if conversation_id: live = self._live_sessions.get(conversation_id) if live and live.session_id: logger.info( f"Follow-up message to live session " f"{conversation_id} (agent session={live.session_id[:8]}...)" ) live.touch() live.message_count += 1 if self._config.mode == RuntimeMode.SDK and self._sdk_available: result = self._chat_sdk(message, conversation_id, system_prompt) else: result = self._chat_cli( message, conversation_id, system_prompt, files=files, fork=fork, title=title, ) result.duration_ms = int((time.time() - started) * 1000) if conversation_id and result.session_id: live = self._live_sessions.get_or_create(conversation_id) live.session_id = result.session_id live.touch() live.message_count += 1 return result except Exception as e: duration_ms = int((time.time() - started) * 1000) logger.error(f"Agent error: {e}", exc_info=True) return AgentResponse( text="", error=str(e), duration_ms=duration_ms, ) def send_followup( self, message: str, conversation_id: str, system_prompt: str | None = None, ) -> AgentResponse: """ Send a follow-up message to an active live session. This is the IPC streaming entry point — it pipes a new message into an existing agent session, mirroring nanoclaw's MessageStream pattern where the host writes IPC files that get consumed by the running agent. If no live session exists, falls back to a regular chat() call which will create a new session or resume the persisted one. Args: message: The follow-up message text conversation_id: The conversation to send to system_prompt: Optional system prompt override Returns: AgentResponse with the AI's reply """ live = self._live_sessions.get(conversation_id) if not live or not live.session_id: logger.debug( f"No live session for {conversation_id}, " f"falling back to chat()" ) return self.chat(message, conversation_id, system_prompt) logger.info( f"IPC follow-up: conversation={conversation_id}, " f"session={live.session_id[:8]}..., " f"msg_count={live.message_count + 1}" ) live.touch() live.message_count += 1 # Route through the normal chat — the SessionStore already has the # mapping from conversation_id → opencode session_id, so the CLI # will use --continue --session, and the SDK will reuse the session. return self.chat(message, conversation_id, system_prompt) def close_session(self, conversation_id: str) -> bool: """ Close a live session explicitly. Mirrors nanoclaw's _close sentinel — signals that the session should end and resources should be freed. Returns True if a session was closed. """ live = self._live_sessions.get(conversation_id) if live: self._live_sessions.remove(conversation_id) logger.info( f"Live session closed: {conversation_id} " f"(messages={live.message_count}, " f"alive={int(live.idle_seconds)}s)" ) return True return False def get_status(self) -> dict: """Get the runtime status (for the /status command).""" status = { "mode": self._config.mode.value, "model": self._config.model or "default", "provider": self._config.provider or "auto", "agent": self._config.agent or "default", "active_sessions": self._sessions.count, "live_sessions": len(self._live_sessions.list_active()), "opencode_available": self._is_opencode_available(), "attach": self._config.attach_url or "none", } if self._config.mode == RuntimeMode.SDK: status["server_url"] = self._config.server_url status["sdk_connected"] = self._sdk_available return status def list_models(self, provider: str | None = None, verbose: bool = False) -> str: """ List available models via `opencode models [provider]`. Returns the raw output as a string. """ args = [self._config.command, "models"] if provider: args.append(provider) if verbose: args.append("--verbose") try: result = subprocess.run( args, capture_output=True, text=True, timeout=30, env=self._build_cli_env(), ) output = result.stdout.strip() or result.stderr.strip() return output or "No models found." except subprocess.TimeoutExpired: return "⚠️ Timed out fetching models." except FileNotFoundError: return "⚠️ OpenCode CLI not found." def get_stats(self, days: int | None = None) -> str: """ Get token usage and cost stats via `opencode stats`. Returns the raw output as a string. """ args = [self._config.command, "stats"] if days: args.extend(["--days", str(days)]) try: result = subprocess.run( args, capture_output=True, text=True, timeout=15, env=self._build_cli_env(), ) output = result.stdout.strip() or result.stderr.strip() return output or "No stats available." except subprocess.TimeoutExpired: return "⚠️ Timed out fetching stats." except FileNotFoundError: return "⚠️ OpenCode CLI not found." def list_agents(self) -> str: """ List available agents via `opencode agent list`. Returns the raw output as a string. """ args = [self._config.command, "agent", "list"] try: result = subprocess.run( args, capture_output=True, text=True, timeout=15, env=self._build_cli_env(), ) output = result.stdout.strip() or result.stderr.strip() return output or "No agents found." except subprocess.TimeoutExpired: return "⚠️ Timed out listing agents." except FileNotFoundError: return "⚠️ OpenCode CLI not found." def cleanup_sessions(self) -> int: """Clean up stale sessions. Returns count removed.""" return self._sessions.cleanup(self._config.session_ttl_hours) def reset_session(self, conversation_id: str) -> bool: """Clear both the persistent session mapping and the live session. Returns ``True`` if anything was cleared. """ had_live = self.close_session(conversation_id) had_persistent = self._sessions.get(conversation_id) is not None if had_persistent: self._sessions.remove(conversation_id) cleared = had_live or had_persistent if cleared: logger.info(f"Session reset: {conversation_id}") return cleared # ------------------------------------------------------------------- # CLI Mode: Subprocess execution # (mirrors OpenClaw's runCliAgent → runCommandWithTimeout pattern) # ------------------------------------------------------------------- def _chat_cli( self, message: str, conversation_id: str | None = None, system_prompt: str | None = None, files: list[str] | None = None, fork: bool = False, title: str | None = None, ) -> AgentResponse: """ Run OpenCode in CLI mode via `opencode run`. """ args = self._build_cli_args( message, conversation_id, system_prompt, files=files, fork=fork, title=title, ) logger.info( f"CLI exec: {self._config.command} run " f"(prompt_chars={len(message)}, " f"session={conversation_id or 'new'})" ) try: # Run the command — mirrors OpenClaw's runCommandWithTimeout() result = subprocess.run( args, capture_output=True, text=True, timeout=self._config.timeout_seconds, cwd=self._config.workspace_dir or os.getcwd(), env=self._build_cli_env(), ) stdout = result.stdout.strip() stderr = result.stderr.strip() if result.returncode != 0: # Mirror OpenClaw's error classification error_text = stderr or stdout or "CLI command failed" logger.error( f"CLI failed (code={result.returncode}): {error_text[:200]}" ) return AgentResponse( text="", error=f"OpenCode CLI error: {error_text[:500]}", rate_limited=_is_rate_limited(error_text), ) # Parse the output — mirrors OpenClaw's parseCliJson/parseCliJsonl response_text = self._parse_cli_output(stdout) if not response_text and stdout.strip(): # Only fall back to raw output if it doesn't look like JSON events # (which would leak internal lifecycle data to the user) if not stdout.strip().startswith("{"): response_text = stdout # Extract session ID if returned session_id = self._extract_session_id(stdout) if session_id and conversation_id: self._sessions.set(conversation_id, session_id) return AgentResponse( text=response_text, session_id=session_id, model=self._config.model, ) except subprocess.TimeoutExpired: logger.error( f"CLI timeout after {self._config.timeout_seconds}s" ) return AgentResponse( text="", error=f"Request timed out after {self._config.timeout_seconds}s", ) def _build_cli_args( self, message: str, conversation_id: str | None = None, system_prompt: str | None = None, files: list[str] | None = None, fork: bool = False, title: str | None = None, ) -> list[str]: """ Build CLI arguments for `opencode run`. Supports: --model, --session, --continue, --format, --agent, --attach, --file, --fork, --title """ args = [self._config.command, "run"] # Model selection if self._config.model: args.extend(["--model", self._config.model]) # Agent selection if self._config.agent: args.extend(["--agent", self._config.agent]) # Attach to running server (avoids MCP cold boot per request) if self._config.attach_url: args.extend(["--attach", self._config.attach_url]) # Session continuity existing_session = None if conversation_id: existing_session = self._sessions.get(conversation_id) if existing_session: args.extend(["--continue", "--session", existing_session]) if fork: args.append("--fork") # Session title if title: args.extend(["--title", title]) # File attachments if files: for f in files: args.extend(["--file", f]) # Output format if self._config.format and self._config.format in ("default", "json"): args.extend(["--format", self._config.format]) # Build the full prompt — prepend system prompt if provided if system_prompt: full_message = ( f"\n{system_prompt}\n\n\n" f"\n{message}\n" ) else: full_message = message args.append(full_message) return args def _build_cli_env(self) -> dict[str, str]: """ Build environment variables for the CLI subprocess. Note: OpenCode reads OPENCODE_* env vars as config overrides and tries to parse their values as JSON. We must NOT set arbitrary OPENCODE_* vars here — only pass through the parent environment. """ env = os.environ.copy() return env def _parse_cli_output(self, stdout: str) -> str: """ Parse CLI output to extract the response text. OpenCode's `--format json` emits JSONL (one JSON object per line): {"type":"step_start", "sessionID":"ses_...", "part":{...}} {"type":"text", "sessionID":"ses_...", "part":{"type":"text","text":"Hello!"}} {"type":"step_finish","sessionID":"ses_...", "part":{"type":"step-finish",...}} We extract text ONLY from "text" type events. All other event types (step_start, step_finish, tool_use, tool_result, etc.) are internal lifecycle events and must never be shown to the user. """ if not stdout.strip(): return "" # Track whether we found any JSON at all (to distinguish JSONL from plain text) found_json = False # Parse JSONL lines — collect text from "text" type events only lines = stdout.strip().split("\n") texts = [] for line in lines: line = line.strip() if not line: continue try: event = json.loads(line) found_json = True if not isinstance(event, dict): continue # OpenCode event format: extract text from part.text event_type = event.get("type", "") part = event.get("part", {}) if event_type == "text" and isinstance(part, dict): text = part.get("text", "") if text: texts.append(text) continue # Also handle "result" type events (Claude JSON format) if event_type == "result": text = event.get("result", "") if text: texts.append(text) continue # Also handle "assistant" type events (Claude stream-json) if event_type == "assistant" and "message" in event: msg = event["message"] if "content" in msg: for block in msg["content"]: if block.get("type") == "text": t = block.get("text", "") if t: texts.append(t) continue # Skip all other event types silently (step_start, step_finish, # tool_use, tool_result, system, etc.) except json.JSONDecodeError: # Not JSON — might be plain text output (--format default) # Only include if we haven't seen JSON yet (pure plain text mode) if not found_json: texts.append(line) if texts: return "\n".join(texts) # If we parsed JSON events but found no text, the response was # purely tool-use with no user-facing text. Return empty rather # than leaking raw JSON events. if found_json: logger.warning( "OpenCode output contained only non-text JSON events " "(no user-facing text found)" ) return "" # Final fallback for non-JSON output return stdout.strip() def _collect_text(self, value: Any) -> str: """ Recursively collect text from a parsed JSON object. Adapted from OpenClaw's collectText() from helpers.ts, with awareness of OpenCode's event structure. """ if not value: return "" if isinstance(value, str): return value if isinstance(value, list): return "".join(self._collect_text(item) for item in value) if isinstance(value, dict): # Skip OpenCode event wrapper — dig into "part" first if "part" in value and isinstance(value["part"], dict): part = value["part"] if "text" in part and isinstance(part["text"], str): return part["text"] # Try common text fields if "content" in value and isinstance(value["content"], str): return value["content"] if "content" in value and isinstance(value["content"], list): return "".join( self._collect_text(item) for item in value["content"] ) if "message" in value and isinstance(value["message"], dict): return self._collect_text(value["message"]) if "result" in value: return self._collect_text(value["result"]) return "" def _extract_session_id(self, stdout: str) -> str | None: """ Extract session ID from CLI output. OpenCode includes sessionID in every JSONL event line: {"type":"text", "sessionID":"ses_abc123", ...} We grab it from the first event that has one. """ lines = stdout.strip().split("\n") for line in lines: line = line.strip() if not line: continue try: event = json.loads(line) if not isinstance(event, dict): continue # OpenCode format: top-level sessionID session_id = event.get("sessionID") if isinstance(session_id, str) and session_id.strip(): return session_id.strip() # Fallback: check nested part.sessionID part = event.get("part", {}) if isinstance(part, dict): session_id = part.get("sessionID") if isinstance(session_id, str) and session_id.strip(): return session_id.strip() except json.JSONDecodeError: continue return None # ------------------------------------------------------------------- # SDK Mode: OpenCode serve API # (enhanced version of CLI mode, using the official Python SDK) # ------------------------------------------------------------------- def _init_sdk_client(self) -> None: """Initialize the OpenCode Python SDK client.""" try: from opencode_ai import Opencode kwargs: dict[str, Any] = { "base_url": self._config.server_url, } if self._config.server_password: import httpx kwargs["http_client"] = httpx.Client( auth=( self._config.server_username, self._config.server_password, ) ) self._sdk_client = Opencode(**kwargs) # Test connectivity try: self._sdk_client.app.get() self._sdk_available = True logger.info( f"SDK connected to {self._config.server_url}" ) except Exception as e: logger.warning( f"SDK connection test failed: {e}. " f"Will fall back to CLI mode." ) self._sdk_available = False except ImportError: logger.warning( "opencode-ai SDK not installed. " "Install with: pip install opencode-ai. " "Falling back to CLI mode." ) self._sdk_available = False def _chat_sdk( self, message: str, conversation_id: str | None = None, system_prompt: str | None = None, ) -> AgentResponse: """ Chat using the OpenCode Python SDK. Uses the server API: 1. Create or reuse a session (POST /session) 2. Send a message (POST /session/:id/message → client.session.chat) 3. Parse the AssistantMessage response """ if not self._sdk_client: return self._chat_cli(message, conversation_id, system_prompt) try: # Resolve or create session session_id = None if conversation_id: session_id = self._sessions.get(conversation_id) if not session_id: # Create a new session session = self._sdk_client.session.create() session_id = session.id if conversation_id: self._sessions.set(conversation_id, session_id) logger.info(f"SDK: created session {session_id}") # Build message parts parts = [{"type": "text", "text": message}] # Build chat params chat_kwargs: dict[str, Any] = {"parts": parts} if self._config.model: chat_kwargs["model"] = self._config.model if system_prompt: chat_kwargs["system"] = system_prompt # Send message and get response logger.info( f"SDK chat: session={session_id[:8]}... " f"prompt_chars={len(message)}" ) response = self._sdk_client.session.chat( session_id, **chat_kwargs ) # Extract text from the AssistantMessage response response_text = self._extract_sdk_response_text(response) return AgentResponse( text=response_text, session_id=session_id, model=self._config.model, ) except Exception as e: logger.warning( f"SDK chat failed: {e}. Falling back to CLI mode." ) # Graceful fallback to CLI mode return self._chat_cli(message, conversation_id, system_prompt) def _extract_sdk_response_text(self, response: Any) -> str: """Extract text content from the SDK's AssistantMessage response.""" # The response is an AssistantMessage which has parts if hasattr(response, "parts"): texts = [] for part in response.parts: if hasattr(part, "text"): texts.append(part.text) elif hasattr(part, "content"): texts.append(str(part.content)) return "\n".join(texts).strip() # Fallback: try to get text directly if hasattr(response, "text"): return response.text.strip() # Last resort: stringify return str(response).strip() # ------------------------------------------------------------------- # Validation & Utilities # ------------------------------------------------------------------- def _validate_installation(self) -> None: """Check that OpenCode CLI is installed and accessible.""" cmd = self._config.command # If the resolved command doesn't exist, try resolving again if not os.path.isfile(cmd) and not shutil.which(cmd): resolved = _resolve_opencode_command() if resolved != "opencode" and os.path.isfile(resolved): self._config.command = resolved logger.info(f"Resolved opencode binary: {resolved}") else: logger.warning( f"'{cmd}' not found. " f"Install with: curl -fsSL https://opencode.ai/install | bash " f"or: npm install -g opencode-ai" ) if self._config.mode == RuntimeMode.CLI: logger.warning( "CLI mode requires opencode to be installed. " "If using SDK mode, set OPENCODE_MODE=sdk." ) def _is_opencode_available(self) -> bool: """Check if OpenCode CLI is available.""" try: result = subprocess.run( [self._config.command, "--version"], capture_output=True, text=True, timeout=5, ) return result.returncode == 0 except (subprocess.TimeoutExpired, FileNotFoundError, OSError): return False # --------------------------------------------------------------------------- # System Prompt Builder # (Mirrors OpenClaw's buildSystemPrompt in cli-runner/helpers.ts) # --------------------------------------------------------------------------- def build_aetheel_system_prompt( user_name: str | None = None, channel_name: str | None = None, is_dm: bool = False, extra_context: str | None = None, ) -> str: """ Build the system prompt for Aetheel. Like OpenClaw's buildAgentSystemPrompt(), this constructs a comprehensive prompt that gives the AI its identity, capabilities, and context. """ lines = [ "You are Aetheel — a personal AI assistant that lives inside Slack.", "", "# Identity", "- Your name is Aetheel", "- You ARE a Slack bot — you are already running inside Slack right now", "- You have your own Slack bot token and can send messages to any channel", "- You have a persistent memory system with identity files (SOUL.md, USER.md, MEMORY.md)", "- You can read and update your memory files across sessions", "", "# Your Capabilities", "- **Direct messaging**: You are already in the user's Slack workspace — no setup needed", "- **Memory**: You have SOUL.md (your personality), USER.md (user profile), MEMORY.md (long-term memory)", "- **Session logs**: Conversations are automatically saved to daily/ session files", "- **Reminders**: You can schedule messages to be sent later using action tags (see below)", "", "# Action Tags", "You can perform actions by including special tags in your response.", "The system will parse these tags and execute the actions automatically.", "", "## Reminders", "To schedule a reminder, include this tag anywhere in your response:", "```", "[ACTION:remind||]", "```", "Example: `[ACTION:remind|2|Time to drink water! 💧]` — sends a Slack message in 2 minutes", "Example: `[ACTION:remind|30|Stand up and stretch! 🧘]` — sends a message in 30 minutes", "", "When scheduling a reminder, confirm to the user that it's been set,", "and include the action tag in your response (it will be hidden from the user).", "", "# Your Tools", "- You have access to shell commands, file operations, and web search", "- Use web search to look up current information when needed", "- You can read and write files in the workspace (~/.aetheel/workspace/)", "- You can execute shell commands for system tasks", "", "# Self-Modification", "- You can edit your own config at ~/.aetheel/config.json", "- You can create new skills by writing SKILL.md files to ~/.aetheel/workspace/skills//SKILL.md", "- You can update your identity files (SOUL.md, USER.md, MEMORY.md)", "- You can modify HEARTBEAT.md to change your periodic tasks", "- After editing config, tell the user to restart or use /reload", "", "# Subagents & Teams", "- You can spawn background subagents for long-running tasks using [ACTION:spawn|]", "- You can use Team tools (TeamCreate, SendMessage) for multi-agent coordination", "- Use /subagents to list active background tasks", "", "# Guidelines", "- Be helpful, concise, and friendly", "- Use Slack formatting (bold with *text*, code with `text`, etc.)", "- Keep responses focused and relevant", "- If you don't know something, say so honestly", "- Avoid extremely long responses unless asked for detail", "- NEVER ask for Slack tokens, webhook URLs, or API keys — you already have them", "- NEVER suggest the user 'set up' Slack — you ARE the Slack bot", "", "# Context", ] if user_name: lines.append(f"- You are chatting with: {user_name}") if channel_name and not is_dm: lines.append(f"- Channel: #{channel_name}") if is_dm: lines.append("- This is a direct message (private conversation)") if extra_context: lines.append("") lines.append(extra_context) return "\n".join(lines)