Files
Aetheel/main.py
tanmay11k 82c2640481 feat: openclaw-style secrets (env.vars + \) and per-task model routing
- Replace python-dotenv with config.json env.vars block + \ substitution
- Add models section for per-task model routing (heartbeat, subagent, default)
- Heartbeat/subagent tasks can use different models/providers than main chat
- Remove python-dotenv from dependencies
- Update all docs to reflect new config approach
- Reorganize docs into project/ and research/ subdirectories
2026-02-20 23:49:05 -05:00

2089 lines
74 KiB
Python

#!/usr/bin/env python3
"""
Aetheel — Main Entry Point
============================
Starts the AI assistant with multi-channel adapters, memory, skills,
scheduled tasks, and subagent support.
Usage:
python main.py Start with Slack + AI handler
python main.py --telegram Also enable Telegram adapter
python main.py --discord Also enable Discord adapter
python main.py --claude Use Claude Code runtime
python main.py --test Echo handler for testing
python main.py --model anthropic/claude-sonnet-4-20250514
python main.py --log DEBUG Debug logging
"""
import argparse
import asyncio
import logging
import os
import re
import sys
import threading
from datetime import datetime
# Config handles secrets via config.json env.vars + ${VAR} substitution.
# No .env file needed.
from adapters.base import BaseAdapter, IncomingMessage
from adapters.slack_adapter import SlackAdapter
from agent.claude_runtime import ClaudeCodeConfig, ClaudeCodeRuntime
from agent.opencode_runtime import (
AgentResponse,
OpenCodeConfig,
OpenCodeRuntime,
RuntimeMode,
build_aetheel_system_prompt,
)
from agent.subagent import SubagentManager
from config import AetheelConfig, ModelRouteConfig, load_config, save_default_config, write_mcp_config, CONFIG_PATH
from heartbeat import HeartbeatRunner
from hooks import HookManager, HookEvent
from memory import MemoryManager
from memory.types import MemoryConfig
from scheduler import Scheduler
from scheduler.store import ScheduledJob
from skills import SkillsManager
logger = logging.getLogger("aetheel")
# Type alias for either runtime
AnyRuntime = OpenCodeRuntime | ClaudeCodeRuntime
# Global instances (initialized in main)
_runtime: AnyRuntime | None = None
_memory: MemoryManager | None = None
_skills: SkillsManager | None = None
_scheduler: Scheduler | None = None
_subagent_mgr: SubagentManager | None = None
_heartbeat: HeartbeatRunner | None = None
_hook_mgr: HookManager | None = None
_webhook_receiver = None # WebhookReceiver | None
_adapters: dict[str, BaseAdapter] = {} # source_name -> adapter
# Runtime config (stored for subagent factory)
_use_claude: bool = False
_cli_args: argparse.Namespace | None = None
# Usage tracking
_usage_stats: dict = {
"total_requests": 0,
"total_cost_usd": 0.0,
"total_duration_ms": 0,
"requests_by_engine": {"opencode": 0, "claude": 0},
"cost_by_engine": {"opencode": 0.0, "claude": 0.0},
"rate_limit_hits": 0,
"failovers": 0,
"last_rate_limit": None, # ISO timestamp
"session_start": datetime.now().isoformat(),
}
# Regex for parsing action tags from AI responses
_ACTION_RE = re.compile(r"\[ACTION:remind\|(\d+)\|(.+?)\]", re.DOTALL)
_CRON_RE = re.compile(r"\[ACTION:cron\|([\d\*/,\- ]+)\|(.+?)\]", re.DOTALL)
_SPAWN_RE = re.compile(r"\[ACTION:spawn\|(.+?)\]", re.DOTALL)
# ---------------------------------------------------------------------------
# Message Handlers
# ---------------------------------------------------------------------------
def echo_handler(msg: IncomingMessage) -> str:
"""Simple echo handler for testing."""
response_lines = [
f"👋 *Aetheel received your message!*",
"",
f"📝 *Text:* {msg.text}",
f"👤 *From:* {msg.user_name} (`{msg.user_id}`)",
f"📍 *Channel:* {msg.channel_name} (`{msg.channel_id}`)",
f"💬 *Source:* {msg.source}",
f"🧵 *ConvID:* `{msg.conversation_id[:15]}...`",
f"🕐 *Time:* {msg.timestamp.strftime('%Y-%m-%d %H:%M:%S UTC')}",
"",
f"_This is an echo response from the Aetheel test handler._",
]
return "\n".join(response_lines)
def _build_context(msg: IncomingMessage) -> str:
"""
Build full context to inject into the system prompt.
Combines:
- Identity files (SOUL.md, USER.md, MEMORY.md)
- Relevant memory search results
- Relevant skills for this message
- Available skills summary
"""
global _memory, _skills
sections: list[str] = []
# ── Identity: SOUL.md ──
if _memory:
soul = _memory.read_soul()
if soul:
sections.append(f"# Your Identity (SOUL.md)\n\n{soul}")
# ── User profile: USER.md ──
user = _memory.read_user()
if user:
sections.append(f"# About the User (USER.md)\n\n{user}")
# ── Long-term memory: MEMORY.md ──
ltm = _memory.read_long_term_memory()
if ltm:
sections.append(f"# Long-Term Memory (MEMORY.md)\n\n{ltm}")
# ── Relevant memory search results ──
try:
results = asyncio.run(_memory.search(msg.text, max_results=3, min_score=0.2))
if results:
snippets = []
for r in results:
if r.path in ("SOUL.md", "USER.md", "MEMORY.md"):
continue
snippets.append(
f"**{r.path}** (lines {r.start_line}-{r.end_line}, "
f"relevance {r.score:.0%}):\n{r.snippet[:500]}"
)
if snippets:
sections.append(
"# Relevant Memory Context\n\n"
+ "\n\n---\n\n".join(snippets)
)
except Exception as e:
logger.debug(f"Memory search failed: {e}")
# ── Skills context ──
if _skills:
# Inject matching skill instructions
skill_context = _skills.get_context(msg.text)
if skill_context:
sections.append(skill_context)
# Always show available skills summary
skills_summary = _skills.get_all_context()
if skills_summary:
sections.append(skills_summary)
# ── Discord channel history context ──
history_context = msg.raw_event.get("history_context", "")
if history_context:
sections.append(
f"# Recent Channel History\n\n{history_context}"
)
return "\n\n---\n\n".join(sections)
def ai_handler(msg: IncomingMessage) -> str:
"""
AI-powered handler — the heart of Aetheel.
Flow:
Message → context (memory + skills) → system prompt → runtime.chat()
→ action tags → session log → response
"""
global _runtime, _memory, _scheduler
if _runtime is None:
return "⚠️ AI runtime not initialized. Please restart the service."
text_lower = msg.text.lower().strip()
# Built-in commands (bypass AI)
# Commands work with or without "/" prefix.
# In Slack channels, "/" is intercepted as a native slash command,
# so users should use the prefix-free form (e.g. "engine claude").
# In DMs and other adapters, both forms work.
cmd = text_lower.lstrip("/")
if cmd in ("status", "ping"):
return _format_status()
if cmd == "help":
return _format_help()
if cmd == "time":
return f"🕐 Server time: *{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}*"
if cmd in ("sessions",):
return _format_sessions()
# Cron management commands
if cmd.startswith("cron"):
return _handle_cron_command(cmd)
# Reload config and skills
if cmd in ("reload",):
cfg = load_config()
if _skills:
_skills.reload()
if _hook_mgr:
_hook_mgr.trigger(HookEvent(type="command", action="reload"))
return "🔄 Config and skills reloaded."
# List active subagents
if cmd in ("subagents",):
return _format_subagents()
# Runtime management commands
if cmd.startswith("engine") or cmd.startswith("runtime"):
return _handle_engine_command(msg.text.strip().lstrip("/"))
if cmd.startswith("model"):
return _handle_model_command(msg.text.strip().lstrip("/"))
if cmd.startswith("provider"):
return _handle_provider_command(msg.text.strip().lstrip("/"))
if cmd.startswith("config"):
return _handle_config_command(msg.text.strip().lstrip("/"))
if cmd.startswith("usage"):
return _handle_usage_command()
if cmd.startswith("models"):
return _handle_models_command(msg.text.strip().lstrip("/"))
if cmd.startswith("stats"):
return _handle_stats_command(msg.text.strip().lstrip("/"))
if cmd.startswith("agents") or cmd == "agent list":
return _handle_agents_command()
if cmd.startswith("mcp"):
return _handle_mcp_command(msg.text.strip().lstrip("/"))
if cmd.startswith("skill"):
return _handle_skill_command(msg.text.strip().lstrip("/"))
# Detect natural language skill creation requests and handle locally
# instead of routing to the AI (which may fail or be slow).
_SKILL_CREATE_RE = re.compile(
r"(?:create|make|add|build|write)\s+(?:a\s+)?(?:new\s+)?skill\s+(?:for|to|that|about|called|named)\s+(.+)",
re.IGNORECASE,
)
skill_match = _SKILL_CREATE_RE.match(text_lower)
if skill_match:
description = skill_match.group(1).strip().rstrip(".")
# Derive a slug name from the description
name = re.sub(r"[^a-z0-9]+", "-", description.lower()).strip("-")[:40]
if name:
return _create_skill_from_description(name, description)
# Detect natural language skill rename requests
_SKILL_RENAME_RE = re.compile(
r"rename\s+(?:the\s+)?(?:this\s+)?(?:skill\s+)?(.+?)\s+to\s+(.+)",
re.IGNORECASE,
)
rename_match = _SKILL_RENAME_RE.match(text_lower)
if rename_match:
old_name = rename_match.group(1).strip()
new_name = rename_match.group(2).strip().lower().replace(" ", "-")
if old_name and new_name:
return _handle_skill_command(f"skill rename {old_name} {new_name}")
# Build context from memory + skills
context = _build_context(msg)
# Route to AI via runtime
system_prompt = build_aetheel_system_prompt(
user_name=msg.user_name,
channel_name=msg.channel_name,
is_dm=msg.is_dm,
extra_context=context,
)
response = _runtime.chat(
message=msg.text,
conversation_id=msg.conversation_id,
system_prompt=system_prompt,
)
# Track usage stats
_track_usage(response)
# Rate limit detection + auto-failover
if response.rate_limited:
failover_response = _handle_rate_limit(msg, system_prompt, response)
if failover_response is not None:
return failover_response
if not response.ok:
error_msg = response.error or "Unknown error"
logger.error(f"AI error: {error_msg}")
if "not found" in error_msg.lower() or "not installed" in error_msg.lower():
return (
"⚠️ AI CLI is not available.\n"
"Check the runtime installation docs."
)
if "timeout" in error_msg.lower():
return (
"⏳ The AI took too long to respond. "
"Try a shorter or simpler question."
)
return f"⚠️ AI error: {error_msg[:200]}"
logger.info(
f"🤖 AI response: {len(response.text)} chars, "
f"{response.duration_ms}ms"
f"{', $' + str(response.usage.get('cost_usd', 0)) if response.usage else ''}"
)
# Parse and execute action tags (reminders, cron, spawn)
reply_text = _process_action_tags(response.text, msg)
# If the AI may have created/modified skills (via file tools), reload them
# so that `skill list` reflects the changes immediately.
if _skills and any(
kw in text_lower
for kw in ("skill", "create a skill", "new skill", "add a skill", "make a skill")
):
try:
_skills.reload()
logger.info("Skills reloaded after potential skill modification")
except Exception as e:
logger.debug(f"Skills reload after AI response failed: {e}")
# Log conversation to memory session log
if _memory:
try:
channel = "dm" if msg.is_dm else msg.channel_name or msg.source
_memory.log_session(
f"**User ({msg.user_name}):** {msg.text}\n\n"
f"**Aetheel:** {reply_text}",
channel=channel,
)
except Exception as e:
logger.debug(f"Session logging failed: {e}")
# Guard against empty responses (e.g. when AI did tool-only work
# and the parser correctly stripped all non-text events)
if not reply_text or not reply_text.strip():
return "✅ Done — I processed that but had no text to show."
return reply_text
# ---------------------------------------------------------------------------
# Action Tag Processing
# ---------------------------------------------------------------------------
def _process_action_tags(text: str, msg: IncomingMessage) -> str:
"""
Parse and execute action tags from the AI response.
Supports:
[ACTION:remind|<minutes>|<message>] → one-shot reminder
[ACTION:cron|<cron_expr>|<prompt>] → recurring cron job
[ACTION:spawn|<task description>] → background subagent
"""
cleaned = text
# ── Remind tags (one-shot) ──
for match in _ACTION_RE.finditer(text):
minutes_str, reminder_msg = match.group(1), match.group(2)
try:
minutes = int(minutes_str)
if _scheduler:
_scheduler.add_once(
delay_minutes=minutes,
prompt=reminder_msg.strip(),
channel_id=msg.channel_id,
channel_type=msg.source,
thread_id=msg.raw_event.get("thread_id"),
user_name=msg.user_name,
)
logger.info(
f"⏰ Reminder scheduled: '{reminder_msg.strip()[:50]}' "
f"in {minutes} min for {msg.source}/{msg.channel_name}"
)
except Exception as e:
logger.warning(f"Failed to schedule reminder: {e}")
cleaned = cleaned.replace(match.group(0), "").strip()
# ── Cron tags (recurring) ──
for match in _CRON_RE.finditer(text):
cron_expr, cron_prompt = match.group(1).strip(), match.group(2).strip()
try:
if _scheduler:
job_id = _scheduler.add_cron(
cron_expr=cron_expr,
prompt=cron_prompt,
channel_id=msg.channel_id,
channel_type=msg.source,
thread_id=msg.raw_event.get("thread_id"),
user_name=msg.user_name,
)
logger.info(
f"🔄 Cron scheduled: '{cron_prompt[:50]}' ({cron_expr}) "
f"job_id={job_id}"
)
except Exception as e:
logger.warning(f"Failed to schedule cron job: {e}")
cleaned = cleaned.replace(match.group(0), "").strip()
# ── Spawn tags (subagent) ──
for match in _SPAWN_RE.finditer(text):
spawn_task = match.group(1).strip()
try:
if _subagent_mgr:
task_id = _subagent_mgr.spawn(
task=spawn_task,
channel_id=msg.channel_id,
channel_type=msg.source,
thread_id=msg.raw_event.get("thread_id"),
user_name=msg.user_name,
)
logger.info(
f"🚀 Subagent spawned: '{spawn_task[:50]}' "
f"task_id={task_id}"
)
except Exception as e:
logger.warning(f"Failed to spawn subagent: {e}")
cleaned = cleaned.replace(match.group(0), "").strip()
return cleaned
# ---------------------------------------------------------------------------
# Cron Management Commands
# ---------------------------------------------------------------------------
def _handle_cron_command(text: str) -> str:
"""Handle /cron subcommands."""
global _scheduler
if not _scheduler:
return "⚠️ Scheduler not initialized."
parts = text.strip().split(maxsplit=2)
if len(parts) < 2 or parts[1] == "list":
jobs = _scheduler.list_jobs()
if not jobs:
return "📋 No scheduled jobs."
lines = ["📋 *Scheduled Jobs:*\n"]
for job in jobs:
kind = f"🔄 `{job.cron_expr}`" if job.is_recurring else "⏰ one-shot"
lines.append(
f"• `{job.id}` — {kind}{job.prompt[:60]}"
)
return "\n".join(lines)
if parts[1] == "remove" and len(parts) >= 3:
job_id = parts[2].strip()
if _scheduler.remove(job_id):
return f"✅ Job `{job_id}` removed."
return f"⚠️ Job `{job_id}` not found."
return (
"Usage: `cron list` or `cron remove <id>`"
)
# ---------------------------------------------------------------------------
# Runtime / Model / Provider Management Commands
# ---------------------------------------------------------------------------
def _handle_engine_command(text: str) -> str:
"""
Handle /engine (or /runtime) commands.
/engine Show current engine
/engine opencode Switch to OpenCode
/engine claude Switch to Claude Code
"""
global _runtime, _use_claude
parts = text.strip().split(maxsplit=1)
if len(parts) < 2:
engine = "claude" if _use_claude else "opencode"
status = _runtime.get_status() if _runtime else {}
model = status.get("model", "default")
provider = status.get("provider", "auto")
return (
f"🧠 *Current Engine:* `{engine}`\n"
f"• *Model:* `{model}`\n"
f"• *Provider:* `{provider}`\n"
f"\n"
f"Switch with: `/engine opencode` or `/engine claude`"
)
target = parts[1].strip().lower()
if target not in ("opencode", "claude"):
return "Usage: `/engine opencode` or `/engine claude`"
want_claude = target == "claude"
if want_claude == _use_claude:
return f"Already using `{target}`."
# Hot-swap the runtime
try:
_use_claude = want_claude
cfg = load_config()
cfg.runtime.engine = target
if want_claude:
new_config = ClaudeCodeConfig(
model=cfg.claude.model,
timeout_seconds=cfg.claude.timeout_seconds,
max_turns=cfg.claude.max_turns,
no_tools=cfg.claude.no_tools,
allowed_tools=cfg.claude.allowed_tools,
)
_runtime = ClaudeCodeRuntime(new_config)
else:
new_config = OpenCodeConfig(
mode=RuntimeMode.SDK if cfg.runtime.mode == "sdk" else RuntimeMode.CLI,
server_url=cfg.runtime.server_url,
timeout_seconds=cfg.runtime.timeout_seconds,
model=cfg.runtime.model,
provider=cfg.runtime.provider,
workspace_dir=cfg.runtime.workspace,
format=cfg.runtime.format,
agent=cfg.runtime.agent,
attach_url=cfg.runtime.attach,
)
_runtime = OpenCodeRuntime(new_config)
# Persist to config.json
_update_config_file({"runtime": {"engine": target}})
status = _runtime.get_status()
return (
f"✅ Switched to `{target}`\n"
f"• *Model:* `{status.get('model', 'default')}`\n"
f"• *Provider:* `{status.get('provider', 'auto')}`"
)
except Exception as e:
logger.error(f"Engine switch failed: {e}", exc_info=True)
return f"⚠️ Failed to switch engine: {e}"
def _handle_model_command(text: str) -> str:
"""
Handle /model commands.
/model Show current model
/model <name> Switch model (hot-swap)
/model anthropic/claude-sonnet-4-20250514
/model openai/gpt-4o
"""
global _runtime, _use_claude
parts = text.strip().split(maxsplit=1)
if len(parts) < 2:
status = _runtime.get_status() if _runtime else {}
return (
f"🧠 *Current Model:* `{status.get('model', 'default')}`\n"
f"• *Engine:* `{'claude' if _use_claude else 'opencode'}`\n"
f"• *Provider:* `{status.get('provider', 'auto')}`\n"
f"\n"
f"Switch with: `/model <model-name>`\n"
f"Examples:\n"
f"• `/model anthropic/claude-sonnet-4-20250514`\n"
f"• `/model openai/gpt-4o`\n"
f"• `/model google/gemini-2.5-pro`"
)
new_model = parts[1].strip()
if not new_model:
return "Usage: `/model <model-name>`"
try:
cfg = load_config()
if _use_claude:
cfg.claude.model = new_model
new_config = ClaudeCodeConfig(
model=new_model,
timeout_seconds=cfg.claude.timeout_seconds,
max_turns=cfg.claude.max_turns,
no_tools=cfg.claude.no_tools,
allowed_tools=cfg.claude.allowed_tools,
)
_runtime = ClaudeCodeRuntime(new_config)
_update_config_file({"claude": {"model": new_model}})
else:
cfg.runtime.model = new_model
new_config = OpenCodeConfig(
mode=RuntimeMode.SDK if cfg.runtime.mode == "sdk" else RuntimeMode.CLI,
server_url=cfg.runtime.server_url,
timeout_seconds=cfg.runtime.timeout_seconds,
model=new_model,
provider=cfg.runtime.provider,
workspace_dir=cfg.runtime.workspace,
format=cfg.runtime.format,
agent=cfg.runtime.agent,
attach_url=cfg.runtime.attach,
)
_runtime = OpenCodeRuntime(new_config)
_update_config_file({"runtime": {"model": new_model}})
return f"✅ Model switched to `{new_model}`"
except Exception as e:
logger.error(f"Model switch failed: {e}", exc_info=True)
return f"⚠️ Failed to switch model: {e}"
def _handle_provider_command(text: str) -> str:
"""
Handle /provider commands (OpenCode only).
/provider Show current provider
/provider anthropic Switch provider
/provider openai Switch provider
"""
global _runtime, _use_claude
if _use_claude:
return "Provider selection is only available with the OpenCode engine. Claude Code always uses Anthropic."
parts = text.strip().split(maxsplit=1)
if len(parts) < 2:
status = _runtime.get_status() if _runtime else {}
return (
f"🔌 *Current Provider:* `{status.get('provider', 'auto')}`\n"
f"\n"
f"Switch with: `/provider <name>`\n"
f"Examples: `anthropic`, `openai`, `google`, `auto`"
)
new_provider = parts[1].strip().lower()
try:
cfg = load_config()
cfg.runtime.provider = new_provider if new_provider != "auto" else None
new_config = OpenCodeConfig(
mode=RuntimeMode.SDK if cfg.runtime.mode == "sdk" else RuntimeMode.CLI,
server_url=cfg.runtime.server_url,
timeout_seconds=cfg.runtime.timeout_seconds,
model=cfg.runtime.model,
provider=new_provider if new_provider != "auto" else None,
workspace_dir=cfg.runtime.workspace,
format=cfg.runtime.format,
agent=cfg.runtime.agent,
attach_url=cfg.runtime.attach,
)
_runtime = OpenCodeRuntime(new_config)
_update_config_file({"runtime": {"provider": new_provider}})
return f"✅ Provider switched to `{new_provider}`"
except Exception as e:
logger.error(f"Provider switch failed: {e}", exc_info=True)
return f"⚠️ Failed to switch provider: {e}"
def _handle_config_command(text: str) -> str:
"""
Handle /config commands — view and edit config from chat.
/config Show key settings
/config show Show full config
/config set <key> <val> Set a config value
"""
import json as _json
parts = text.strip().split(maxsplit=2)
subcommand = parts[1] if len(parts) > 1 else "summary"
if subcommand == "show":
try:
with open(CONFIG_PATH, "r", encoding="utf-8") as f:
data = _json.load(f)
formatted = _json.dumps(data, indent=2)
if len(formatted) > 3000:
formatted = formatted[:3000] + "\n... (truncated)"
return f"```\n{formatted}\n```"
except Exception as e:
return f"⚠️ Failed to read config: {e}"
elif subcommand == "set" and len(parts) >= 3:
rest = parts[2].strip()
kv_parts = rest.split(maxsplit=1)
if len(kv_parts) < 2:
return "Usage: `/config set runtime.model anthropic/claude-sonnet-4-20250514`"
key_path = kv_parts[0]
value_str = kv_parts[1]
# Parse value (handle booleans, numbers, null, strings)
try:
value = _json.loads(value_str)
except _json.JSONDecodeError:
value = value_str
keys = key_path.split(".")
if len(keys) < 2:
return "Use dotted notation: `/config set runtime.model <value>`"
update = {}
current = update
for k in keys[:-1]:
current[k] = {}
current = current[k]
current[keys[-1]] = value
try:
_update_config_file(update)
return f"✅ Set `{key_path}` = `{value_str}`\nRun `/reload` to apply changes."
except Exception as e:
return f"⚠️ Failed to update config: {e}"
else:
cfg = load_config()
engine = cfg.runtime.engine
model = cfg.claude.model if engine == "claude" else cfg.runtime.model
provider = cfg.runtime.provider if engine == "opencode" else "anthropic"
lines = [
"⚙️ *Configuration Summary*",
"",
f"• *Engine:* `{engine}`",
f"• *Model:* `{model or 'default'}`",
f"• *Provider:* `{provider or 'auto'}`",
f"• *Mode:* `{cfg.runtime.mode}`",
f"• *Timeout:* `{cfg.runtime.timeout_seconds}s`",
"",
f"• *Slack:* {'' if cfg.slack.enabled else ''}",
f"• *Discord:* {'' if cfg.discord.enabled else ''}",
f"• *Telegram:* {'' if cfg.telegram.enabled else ''}",
f"• *WebChat:* {'' if cfg.webchat.enabled else ''}",
f"• *Webhooks:* {'' if cfg.webhooks.enabled else ''}",
"",
"Use `/config show` for full config, `/config set <key> <value>` to edit.",
"Use `/engine`, `/model`, `/provider` for live switching.",
]
return "\n".join(lines)
def _update_config_file(updates: dict) -> None:
"""
Merge updates into ~/.aetheel/config.json.
Deep merge so partial updates don't clobber existing keys.
"""
import json as _json
data = {}
if os.path.isfile(CONFIG_PATH):
with open(CONFIG_PATH, "r", encoding="utf-8") as f:
data = _json.load(f)
def _deep_merge(base: dict, overlay: dict) -> dict:
for key, value in overlay.items():
if key in base and isinstance(base[key], dict) and isinstance(value, dict):
_deep_merge(base[key], value)
else:
base[key] = value
return base
_deep_merge(data, updates)
with open(CONFIG_PATH, "w", encoding="utf-8") as f:
_json.dump(data, f, indent=2)
logger.info(f"Config updated: {list(updates.keys())}")
# ---------------------------------------------------------------------------
# Usage Tracking, Rate Limit Failover, Notifications
# ---------------------------------------------------------------------------
def _track_usage(response: AgentResponse) -> None:
"""Track usage stats from an AI response."""
global _usage_stats
engine = "claude" if _use_claude else "opencode"
_usage_stats["total_requests"] += 1
_usage_stats["requests_by_engine"][engine] = (
_usage_stats["requests_by_engine"].get(engine, 0) + 1
)
_usage_stats["total_duration_ms"] += response.duration_ms
if response.usage:
cost = response.usage.get("cost_usd", 0) or 0
_usage_stats["total_cost_usd"] += cost
_usage_stats["cost_by_engine"][engine] = (
_usage_stats["cost_by_engine"].get(engine, 0) + cost
)
if response.rate_limited:
_usage_stats["rate_limit_hits"] += 1
_usage_stats["last_rate_limit"] = datetime.now().isoformat()
def _handle_rate_limit(
msg: IncomingMessage,
system_prompt: str,
original_response: AgentResponse,
) -> str | None:
"""
Handle a rate-limited response. Attempts auto-failover to the other
engine and notifies the user.
Returns the failover response text, or None if failover isn't possible.
"""
global _runtime, _use_claude, _usage_stats
current_engine = "claude" if _use_claude else "opencode"
other_engine = "opencode" if _use_claude else "claude"
logger.warning(
f"Rate limit hit on {current_engine}: {original_response.error}"
)
# Notify the user about the rate limit
notify_text = (
f"⚠️ *Rate limit reached on {current_engine}*\n"
f"{original_response.error or 'Usage limit exceeded.'}"
)
# Try auto-failover to the other engine
cfg = load_config()
failover_runtime = None
try:
if _use_claude:
# Failover: Claude → OpenCode
failover_runtime = OpenCodeRuntime(OpenCodeConfig(
mode=RuntimeMode.SDK if cfg.runtime.mode == "sdk" else RuntimeMode.CLI,
server_url=cfg.runtime.server_url,
timeout_seconds=cfg.runtime.timeout_seconds,
model=cfg.runtime.model,
provider=cfg.runtime.provider,
workspace_dir=cfg.runtime.workspace,
format=cfg.runtime.format,
agent=cfg.runtime.agent,
attach_url=cfg.runtime.attach,
))
else:
# Failover: OpenCode → Claude
failover_runtime = ClaudeCodeRuntime(ClaudeCodeConfig(
model=cfg.claude.model,
timeout_seconds=cfg.claude.timeout_seconds,
max_turns=cfg.claude.max_turns,
no_tools=cfg.claude.no_tools,
allowed_tools=cfg.claude.allowed_tools,
))
except Exception as e:
logger.warning(f"Failover engine ({other_engine}) not available: {e}")
return f"{notify_text}\n\n{other_engine} is not available for failover."
# Attempt the request on the failover engine
logger.info(f"Attempting failover: {current_engine}{other_engine}")
try:
failover_response = failover_runtime.chat(
message=msg.text,
conversation_id=msg.conversation_id,
system_prompt=system_prompt,
)
_track_usage(failover_response)
if failover_response.ok:
# Failover succeeded — switch the active runtime
_runtime = failover_runtime
_use_claude = not _use_claude
_usage_stats["failovers"] += 1
engine_name = "claude" if _use_claude else "opencode"
logger.info(f"Failover successful → now using {engine_name}")
# Process action tags on the failover response
reply_text = _process_action_tags(failover_response.text, msg)
return (
f"{notify_text}\n"
f"🔄 Auto-switched to *{other_engine}*.\n\n"
f"{reply_text}"
)
else:
# Failover also failed
return (
f"{notify_text}\n"
f"Failover to {other_engine} also failed: "
f"{failover_response.error or 'Unknown error'}"
)
except Exception as e:
logger.error(f"Failover request failed: {e}", exc_info=True)
return f"{notify_text}\nFailover to {other_engine} failed: {e}"
def _handle_usage_command() -> str:
"""
Handle the `usage` command — show LLM usage stats.
"""
global _usage_stats
total = _usage_stats["total_requests"]
cost = _usage_stats["total_cost_usd"]
duration = _usage_stats["total_duration_ms"]
rate_limits = _usage_stats["rate_limit_hits"]
failovers = _usage_stats["failovers"]
started = _usage_stats["session_start"]
lines = [
"📊 *Usage Stats* (since last restart)",
"",
f"• *Total Requests:* {total}",
f"• *Total Cost:* ${cost:.4f}",
f"• *Total Duration:* {duration / 1000:.1f}s",
f"• *Rate Limit Hits:* {rate_limits}",
f"• *Auto-Failovers:* {failovers}",
f"• *Session Start:* {started}",
"",
"*By Engine:*",
]
for engine in ("opencode", "claude"):
reqs = _usage_stats["requests_by_engine"].get(engine, 0)
eng_cost = _usage_stats["cost_by_engine"].get(engine, 0)
if reqs > 0:
lines.append(f"• *{engine}:* {reqs} requests, ${eng_cost:.4f}")
if _usage_stats["last_rate_limit"]:
lines.append(f"\n⚠️ Last rate limit: {_usage_stats['last_rate_limit']}")
engine = "claude" if _use_claude else "opencode"
status = _runtime.get_status() if _runtime else {}
lines.extend([
"",
f"*Active:* `{engine}` / `{status.get('model', 'default')}`",
])
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Models, Stats, Agents Commands
# ---------------------------------------------------------------------------
def _handle_models_command(text: str) -> str:
"""
Handle the `models` command — list available models from OpenCode.
models List all models
models <provider> List models for a specific provider
models --verbose Include metadata (costs, etc.)
"""
global _runtime, _use_claude
if _use_claude:
return "Model listing is only available with the OpenCode engine."
if not isinstance(_runtime, OpenCodeRuntime):
return "⚠️ OpenCode runtime not initialized."
parts = text.strip().split()
provider = None
verbose = False
for part in parts[1:]: # skip "models"
if part == "--verbose" or part == "-v":
verbose = True
elif not part.startswith("-"):
provider = part
output = _runtime.list_models(provider=provider, verbose=verbose)
# Wrap in code block for readability
if len(output) > 100:
return f"```\n{output[:3500]}\n```"
return output
def _handle_stats_command(text: str) -> str:
"""
Handle the `stats` command — show OpenCode token usage and cost stats.
stats All-time stats
stats 7 Stats for last 7 days
stats 30 Stats for last 30 days
"""
global _runtime, _use_claude
if _use_claude:
return "OpenCode stats are only available with the OpenCode engine. Use `usage` for Aetheel stats."
if not isinstance(_runtime, OpenCodeRuntime):
return "⚠️ OpenCode runtime not initialized."
parts = text.strip().split()
days = None
if len(parts) >= 2:
try:
days = int(parts[1])
except ValueError:
pass
output = _runtime.get_stats(days=days)
if len(output) > 100:
return f"```\n{output[:3500]}\n```"
return output
def _handle_agents_command() -> str:
"""Handle the `agents` command — list available OpenCode agents."""
global _runtime, _use_claude
if _use_claude:
return "Agent listing is only available with the OpenCode engine."
if not isinstance(_runtime, OpenCodeRuntime):
return "⚠️ OpenCode runtime not initialized."
output = _runtime.list_agents()
if len(output) > 100:
return f"```\n{output[:3500]}\n```"
return output
# ---------------------------------------------------------------------------
# MCP Server Management Commands
# ---------------------------------------------------------------------------
def _handle_mcp_command(text: str) -> str:
"""
Handle mcp commands — manage MCP servers from chat.
mcp list List configured servers
mcp add <name> <command> [args...] Add a server
mcp remove <name> Remove a server
mcp enable <name> Enable a disabled server
mcp disable <name> Disable a server
"""
import json as _json
parts = text.strip().split(maxsplit=3)
subcommand = parts[1] if len(parts) > 1 else "list"
cfg = load_config()
if subcommand == "list":
if not cfg.mcp.servers:
return "📦 No MCP servers configured.\n\nAdd one: `mcp add <name> <command> [args]`"
lines = ["📦 *MCP Servers:*\n"]
for name, srv in cfg.mcp.servers.items():
args_str = " ".join(srv.args) if srv.args else ""
lines.append(f"• `{name}` — `{srv.command} {args_str}`")
lines.append(f"\nManage: `mcp add`, `mcp remove <name>`")
return "\n".join(lines)
elif subcommand == "add" and len(parts) >= 3:
rest = text.strip().split(maxsplit=2)[2] # everything after "mcp add"
add_parts = rest.split()
name = add_parts[0]
command = add_parts[1] if len(add_parts) > 1 else "uvx"
args = add_parts[2:] if len(add_parts) > 2 else []
# Update config
server_data = {"command": command, "args": args, "env": {}}
_update_config_file({"mcp": {"servers": {name: server_data}}})
# Rewrite MCP config for the runtime
new_cfg = load_config()
write_mcp_config(new_cfg.mcp, new_cfg.memory.workspace, _use_claude)
return (
f"✅ MCP server `{name}` added\n"
f"• Command: `{command} {' '.join(args)}`\n"
f"Run `reload` to apply."
)
elif subcommand == "remove" and len(parts) >= 3:
name = parts[2].strip()
try:
with open(CONFIG_PATH, "r", encoding="utf-8") as f:
data = _json.load(f)
servers = data.get("mcp", {}).get("servers", {})
if name not in servers:
return f"⚠️ MCP server `{name}` not found."
del servers[name]
with open(CONFIG_PATH, "w", encoding="utf-8") as f:
_json.dump(data, f, indent=2)
new_cfg = load_config()
write_mcp_config(new_cfg.mcp, new_cfg.memory.workspace, _use_claude)
return f"✅ MCP server `{name}` removed."
except Exception as e:
return f"⚠️ Failed to remove: {e}"
else:
return (
"Usage:\n"
"• `mcp list` — List servers\n"
"• `mcp add <name> <command> [args]` — Add a server\n"
"• `mcp remove <name>` — Remove a server\n"
"\nExamples:\n"
"• `mcp add brave-search uvx brave-search-mcp@latest`\n"
"• `mcp add filesystem npx @anthropic-ai/mcp-filesystem /home/user`\n"
"• `mcp add github uvx github-mcp-server`"
)
# ---------------------------------------------------------------------------
# Skills Management Commands
# ---------------------------------------------------------------------------
def _handle_skill_command(text: str) -> str:
"""
Handle skill commands — manage skills from chat.
skill list List loaded skills
skill show <name> Show a skill's content
skill create <name> Create a new skill interactively
skill remove <name> Remove a skill
skill reload Reload all skills
"""
global _skills
parts = text.strip().split(maxsplit=2)
subcommand = parts[1] if len(parts) > 1 else "list"
if subcommand == "list":
if not _skills or not _skills.skills:
return (
"🎯 No skills loaded.\n\n"
"Create one: `skill create <name>`\n"
"Or ask me naturally: \"Create a skill for checking weather\""
)
lines = ["🎯 *Skills:*\n"]
for s in _skills.skills:
triggers = ", ".join(s.triggers[:4])
lines.append(f"• `{s.name}` — {s.description}\n Triggers: {triggers}")
lines.append(f"\nManage: `skill show <name>`, `skill create <name>`, `skill remove <name>`")
return "\n".join(lines)
elif subcommand == "show" and len(parts) >= 3:
name = parts[2].strip()
if not _skills:
return "⚠️ Skills system not initialized."
for s in _skills.skills:
if s.name.lower() == name.lower():
content = s.body[:2000]
return (
f"🎯 *{s.name}*\n"
f"_{s.description}_\n"
f"Triggers: {', '.join(s.triggers)}\n\n"
f"```\n{content}\n```"
)
return f"⚠️ Skill `{name}` not found. Run `skill list` to see available skills."
elif subcommand == "create" and len(parts) >= 3:
name = parts[2].strip().lower().replace(" ", "-")
cfg = load_config()
workspace = os.path.expanduser(cfg.memory.workspace)
skill_dir = os.path.join(workspace, "skills", name)
skill_path = os.path.join(skill_dir, "SKILL.md")
if os.path.exists(skill_path):
return f"⚠️ Skill `{name}` already exists at `{skill_path}`"
os.makedirs(skill_dir, exist_ok=True)
template = (
f"---\n"
f"name: {name}\n"
f"description: TODO — describe what this skill does\n"
f"triggers: [{name}]\n"
f"---\n\n"
f"# {name.title()} Skill\n\n"
f"<!-- Instructions for the AI when this skill is triggered -->\n\n"
f"When the user asks about {name}:\n"
f"1. ...\n"
f"2. ...\n"
f"3. ...\n"
)
with open(skill_path, "w", encoding="utf-8") as f:
f.write(template)
return (
f"✅ Skill `{name}` created at:\n"
f"`{skill_path}`\n\n"
f"Edit the SKILL.md to add your instructions, then run `skill reload`.\n"
f"Or just tell me what the skill should do and I'll write it for you."
)
elif subcommand == "remove" and len(parts) >= 3:
name = parts[2].strip()
cfg = load_config()
workspace = os.path.expanduser(cfg.memory.workspace)
skill_dir = os.path.join(workspace, "skills", name)
if not os.path.isdir(skill_dir):
return f"⚠️ Skill `{name}` not found."
import shutil
shutil.rmtree(skill_dir)
if _skills:
_skills.reload()
return f"✅ Skill `{name}` removed."
elif subcommand == "rename" and len(parts) >= 3:
# Parse: skill rename <old> <new> OR skill rename <old> to <new>
rename_args = parts[2].strip()
rename_parts = re.split(r"\s+to\s+|\s+", rename_args, maxsplit=1)
if len(rename_parts) < 2:
return "Usage: `skill rename <old_name> <new_name>`"
old_name = rename_parts[0].strip()
new_name = rename_parts[1].strip().lower().replace(" ", "-")
cfg = load_config()
workspace = os.path.expanduser(cfg.memory.workspace)
old_dir = os.path.join(workspace, "skills", old_name)
new_dir = os.path.join(workspace, "skills", new_name)
if not os.path.isdir(old_dir):
return f"⚠️ Skill `{old_name}` not found. Run `skill list` to see available skills."
if os.path.isdir(new_dir):
return f"⚠️ Skill `{new_name}` already exists."
import shutil
shutil.move(old_dir, new_dir)
# Update the name field inside SKILL.md
skill_path = os.path.join(new_dir, "SKILL.md")
if os.path.isfile(skill_path):
try:
with open(skill_path, "r", encoding="utf-8") as f:
content = f.read()
content = re.sub(
r"^(name:\s*).*$",
rf"\g<1>{new_name}",
content,
count=1,
flags=re.MULTILINE,
)
with open(skill_path, "w", encoding="utf-8") as f:
f.write(content)
except Exception as e:
logger.warning(f"Failed to update name in SKILL.md: {e}")
if _skills:
_skills.reload()
return f"✅ Skill renamed: `{old_name}` → `{new_name}`"
elif subcommand == "reload":
if _skills:
loaded = _skills.reload()
return f"🔄 Reloaded {len(loaded)} skill(s)."
return "⚠️ Skills system not initialized."
else:
return (
"Usage:\n"
"• `skill list` — List loaded skills\n"
"• `skill show <name>` — Show skill content\n"
"• `skill create <name>` — Create a new skill template\n"
"• `skill rename <old> <new>` — Rename a skill\n"
"• `skill remove <name>` — Remove a skill\n"
"• `skill reload` — Reload all skills\n"
"\nYou can also create skills naturally:\n"
"\"Create a skill for checking stock prices\""
)
def _create_skill_from_description(name: str, description: str) -> str:
"""
Create a skill from a natural language description.
Generates a SKILL.md with sensible triggers derived from the description,
reloads the skills manager, and confirms to the user.
"""
global _skills
cfg = load_config()
workspace = os.path.expanduser(cfg.memory.workspace)
skill_dir = os.path.join(workspace, "skills", name)
skill_path = os.path.join(skill_dir, "SKILL.md")
if os.path.exists(skill_path):
return f"⚠️ Skill `{name}` already exists. Use `skill show {name}` to view it."
# Derive trigger words from the description
stop_words = {
"a", "an", "the", "for", "to", "at", "in", "on", "of", "and",
"or", "is", "it", "my", "me", "do", "get", "how", "what", "when",
"where", "that", "this", "with", "about", "checking", "check",
"current", "currently", "getting", "looking", "find", "finding",
}
words = re.findall(r"[a-z]+", description.lower())
triggers = [w for w in words if w not in stop_words and len(w) > 2]
# Deduplicate while preserving order
seen = set()
triggers = [t for t in triggers if not (t in seen or seen.add(t))]
if not triggers:
triggers = [name]
os.makedirs(skill_dir, exist_ok=True)
template = (
f"---\n"
f"name: {name}\n"
f"description: {description.capitalize()}\n"
f"triggers: [{', '.join(triggers)}]\n"
f"---\n\n"
f"# {name.replace('-', ' ').title()} Skill\n\n"
f"When the user asks about {description}:\n"
f"1. Use web search or available tools to find the information\n"
f"2. Present the results clearly and concisely\n"
f"3. Offer follow-up suggestions if relevant\n"
)
with open(skill_path, "w", encoding="utf-8") as f:
f.write(template)
# Reload so `skill list` picks it up immediately
if _skills:
_skills.reload()
return (
f"✅ Skill `{name}` created!\n"
f"Triggers: {', '.join(triggers)}\n\n"
f"Edit it: `skill show {name}`\n"
f"Or tell me what to change and I'll update it."
)
def _on_scheduled_job(job: ScheduledJob) -> None:
"""
Called by the scheduler when a job fires.
Creates a synthetic IncomingMessage and routes it through ai_handler,
then sends the response to the right channel.
Heartbeat jobs use a dedicated runtime when ``models.heartbeat`` is
configured, so cheap/local models can handle periodic tasks.
"""
logger.info(f"🔔 Scheduled job firing: {job.id}'{job.prompt[:50]}'")
is_heartbeat = job.channel_type == "heartbeat"
# Build a synthetic message
msg = IncomingMessage(
text=job.prompt,
user_id="system",
user_name=job.user_name or "Scheduler",
channel_id=job.channel_id,
channel_name=f"scheduled-{job.id}",
conversation_id=f"cron-{job.id}",
source=job.channel_type,
is_dm=True,
raw_event={"thread_id": job.thread_id},
)
# Route through the AI handler — use heartbeat runtime if configured
try:
if is_heartbeat and _has_model_route("heartbeat"):
response = _run_with_task_runtime("heartbeat", msg)
else:
response = ai_handler(msg)
if response:
_send_to_channel(
channel_id=job.channel_id,
text=response,
thread_id=job.thread_id,
channel_type=job.channel_type,
)
except Exception as e:
logger.error(f"Scheduled job {job.id} handler failed: {e}", exc_info=True)
def _has_model_route(task_type: str) -> bool:
"""Check if a model route is configured for the given task type."""
cfg = load_config()
route = getattr(cfg.models, task_type, None)
return route is not None and route.model is not None
def _run_with_task_runtime(task_type: str, msg: IncomingMessage) -> str:
"""Run a message through a task-specific runtime instance."""
runtime = _make_runtime(task_type)
system_prompt = build_aetheel_system_prompt(
user_name=msg.user_name,
channel_name=msg.channel_name,
is_dm=msg.is_dm,
extra_context=_build_context(msg),
)
response = runtime.chat(
message=msg.text,
conversation_id=msg.conversation_id,
system_prompt=system_prompt,
)
_track_usage(response)
if not response.ok:
logger.warning(f"Task runtime [{task_type}] error: {response.error}")
return f"⚠️ {task_type} task error: {response.error or 'Unknown error'}"
return _process_action_tags(response.text, msg)
# ---------------------------------------------------------------------------
# Multi-Channel Send
# ---------------------------------------------------------------------------
def _send_to_channel(
channel_id: str,
text: str,
thread_id: str | None,
channel_type: str,
) -> None:
"""
Send a message to a specific channel via the right adapter.
Used by the scheduler and subagent manager to route responses
back to the correct platform.
"""
adapter = _adapters.get(channel_type)
if adapter:
adapter.send_message(
channel_id=channel_id,
text=text,
thread_id=thread_id,
)
else:
# Fallback: try the first available adapter
for a in _adapters.values():
a.send_message(channel_id=channel_id, text=text, thread_id=thread_id)
break
else:
logger.warning(
f"No adapter for '{channel_type}' — cannot send message"
)
# ---------------------------------------------------------------------------
# Runtime Factory (for subagents)
# ---------------------------------------------------------------------------
def _make_runtime(task_type: str | None = None) -> AnyRuntime:
"""Create a runtime instance, optionally routed by task type.
*task_type* can be ``"heartbeat"``, ``"subagent"``, or ``None``
(which falls back to the ``"default"`` route, then the global config).
Model routing is configured in ``config.json`` → ``models``::
"models": {
"heartbeat": { "model": "ollama/llama3.2", "provider": "ollama" },
"subagent": { "model": "minimax/minimax-m1", "provider": "minimax" }
}
"""
global _use_claude, _cli_args
cfg = load_config()
if _cli_args and _cli_args.model:
cfg.runtime.model = _cli_args.model
cfg.claude.model = _cli_args.model
# Resolve model route for this task type
route: ModelRouteConfig | None = None
if task_type:
route = getattr(cfg.models, task_type, None)
if route is None:
route = cfg.models.default # may also be None → use global
# Determine engine: route overrides global
use_claude = _use_claude
if route and route.engine:
use_claude = route.engine == "claude"
if use_claude:
config = ClaudeCodeConfig(
model=(route.model if route and route.model else cfg.claude.model),
timeout_seconds=(route.timeout_seconds if route and route.timeout_seconds else cfg.claude.timeout_seconds),
max_turns=cfg.claude.max_turns,
no_tools=cfg.claude.no_tools,
allowed_tools=cfg.claude.allowed_tools,
)
logger.info(f"Runtime [{task_type or 'default'}]: claude, model={config.model or 'default'}")
return ClaudeCodeRuntime(config)
else:
config = OpenCodeConfig(
mode=RuntimeMode.SDK if cfg.runtime.mode == "sdk" else RuntimeMode.CLI,
server_url=cfg.runtime.server_url,
timeout_seconds=(route.timeout_seconds if route and route.timeout_seconds else cfg.runtime.timeout_seconds),
model=(route.model if route and route.model else cfg.runtime.model),
provider=(route.provider if route and route.provider else cfg.runtime.provider),
workspace_dir=cfg.runtime.workspace,
format=cfg.runtime.format,
agent=cfg.runtime.agent,
attach_url=cfg.runtime.attach,
)
logger.info(f"Runtime [{task_type or 'default'}]: opencode, model={config.model or 'default'}, provider={config.provider or 'default'}")
return OpenCodeRuntime(config)
# ---------------------------------------------------------------------------
# Formatting Helpers
# ---------------------------------------------------------------------------
def _format_status() -> str:
"""Format the /status response with runtime info."""
global _runtime, _scheduler, _skills, _subagent_mgr
lines = [
"🟢 *Aetheel is online*",
"",
]
if _runtime:
status = _runtime.get_status()
engine = "claude" if _use_claude else "opencode"
lines.extend([
f"• *Engine:* {engine}",
f"• *Mode:* {status['mode']}",
f"• *Model:* {status['model']}",
f"• *Provider:* {status['provider']}",
f"• *Active Sessions:* {status['active_sessions']}",
f"• *Live Sessions:* {status.get('live_sessions', 0)}",
])
# Adapter status
if _adapters:
adapter_names = ", ".join(_adapters.keys())
lines.append(f"• *Channels:* {adapter_names}")
# Skills status
if _skills:
skill_count = len(_skills.skills)
lines.append(f"• *Skills Loaded:* {skill_count}")
# Scheduler status
if _scheduler:
jobs = _scheduler.list_jobs()
lines.append(f"• *Scheduled Jobs:* {len(jobs)}")
# Subagents status
if _subagent_mgr:
active = _subagent_mgr.list_active()
lines.append(f"• *Active Subagents:* {len(active)}")
# Model routing
cfg = load_config()
routes = []
for task_name in ("heartbeat", "subagent", "default"):
route = getattr(cfg.models, task_name, None)
if route and route.model:
routes.append(f"{task_name}{route.model}")
if routes:
lines.append(f"• *Model Routes:* {', '.join(routes)}")
lines.extend([
"",
f"• *Time:* {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
])
return "\n".join(lines)
def _format_help() -> str:
"""Format the /help response."""
return (
"🦾 *Aetheel — AI-Powered Assistant*\n"
"\n"
"*Commands:* (type as a regular message, no `/` needed)\n"
"\n"
"*General:*\n"
"• `status` — Bot and runtime status\n"
"• `help` — This help message\n"
"• `time` — Server time\n"
"• `sessions` — Active session count\n"
"• `reload` — Reload config and skills\n"
"\n"
"*Runtime:*\n"
"• `engine` — Show/switch AI engine (opencode or claude)\n"
"• `model` — Show/switch AI model\n"
"• `provider` — Show/switch provider (opencode only)\n"
"• `usage` — Show LLM usage stats and costs\n"
"• `models` — List available models (opencode only)\n"
"• `models <provider>` — List models for a provider\n"
"• `stats` — OpenCode token usage and cost stats\n"
"• `stats <days>` — Stats for last N days\n"
"• `agents` — List available OpenCode agents\n"
"\n"
"*Config:*\n"
"• `config` — View config summary\n"
"• `config show` — View full config.json\n"
"• `config set <key> <value>` — Edit a config value\n"
"\n"
"*Scheduler:*\n"
"• `cron list` — List scheduled jobs\n"
"• `cron remove <id>` — Remove a scheduled job\n"
"• `subagents` — List active background tasks\n"
"\n"
"*MCP Servers:*\n"
"• `mcp list` — List configured MCP servers\n"
"• `mcp add <name> <cmd> [args]` — Add a server\n"
"• `mcp remove <name>` — Remove a server\n"
"\n"
"*Skills:*\n"
"• `skill list` — List loaded skills\n"
"• `skill create <name>` — Create a new skill\n"
"• `skill show <name>` — View a skill\n"
"• `skill remove <name>` — Remove a skill\n"
"\n"
"*AI Chat:*\n"
"• Send any other message and the AI will respond\n"
"• Each thread maintains its own conversation\n"
"• DMs work too — just message me directly\n"
)
def _format_sessions() -> str:
"""Format session info."""
global _runtime
if _runtime:
count = _runtime.get_status()["active_sessions"]
cleaned = _runtime.cleanup_sessions()
return (
f"🧵 *Active Sessions:* {count}\n"
f"🧹 *Cleaned up:* {cleaned} stale sessions"
)
return "⚠️ Runtime not initialized."
def _format_subagents() -> str:
"""Format active subagent tasks."""
global _subagent_mgr
if not _subagent_mgr:
return "⚠️ Subagent manager not initialized."
active = _subagent_mgr.list_active()
if not active:
return "📋 No active subagents."
lines = ["🤖 *Active Subagents:*\n"]
for task in active:
lines.append(f"• `{task.id}` — {task.status}{task.task[:60]}")
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
global _runtime, _memory, _skills, _scheduler, _subagent_mgr, _heartbeat
global _hook_mgr, _webhook_receiver
global _adapters, _use_claude, _cli_args
parser = argparse.ArgumentParser(
description="Aetheel — AI-Powered Personal Assistant",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""\
Examples:
python main.py Start with config-driven adapters
python main.py --claude Override: use Claude Code runtime
python main.py --test Echo-only handler (no AI)
python main.py --model anthropic/claude-sonnet-4-20250514
python main.py --log DEBUG Debug logging
All adapters and features are controlled via ~/.aetheel/config.json.
CLI flags are optional overrides.
""",
)
parser.add_argument("--test", action="store_true", help="Use echo handler for testing")
parser.add_argument("--claude", action="store_true", help="Override: use Claude Code runtime")
parser.add_argument("--cli", action="store_true", help="Override: force CLI mode (OpenCode)")
parser.add_argument("--sdk", action="store_true", help="Override: force SDK mode (OpenCode)")
parser.add_argument("--telegram", action="store_true", help="Override: enable Telegram adapter")
parser.add_argument("--discord", action="store_true", help="Override: enable Discord adapter")
parser.add_argument("--webchat", action="store_true", help="Override: enable WebChat adapter")
parser.add_argument("--model", default=None, help="Override: model to use")
parser.add_argument(
"--log",
default=os.environ.get("LOG_LEVEL", "INFO"),
help="Log level (DEBUG, INFO, WARNING, ERROR)",
)
args = parser.parse_args()
_cli_args = args
# -------------------------------------------------------------------
# 0. Load Configuration (config.json + .env + CLI overrides)
# -------------------------------------------------------------------
save_default_config() # Create ~/.aetheel/config.json if missing
cfg = load_config()
# CLI flags override config (flags are optional overrides, config is primary)
if args.claude:
cfg.runtime.engine = "claude"
_use_claude = cfg.runtime.engine == "claude"
log_level = args.log if args.log != "INFO" else cfg.log_level
if args.model:
cfg.runtime.model = args.model
cfg.claude.model = args.model
if args.cli:
cfg.runtime.mode = "cli"
elif args.sdk:
cfg.runtime.mode = "sdk"
# CLI flags can enable adapters (but config is the primary source)
if args.telegram:
cfg.telegram.enabled = True
if args.discord:
cfg.discord.enabled = True
if args.webchat:
cfg.webchat.enabled = True
# Auto-enable adapters when tokens are present (even if not explicitly enabled)
if cfg.telegram.bot_token and not cfg.telegram.enabled:
cfg.telegram.enabled = True
logger.debug("Telegram auto-enabled: token present")
if cfg.discord.bot_token and not cfg.discord.enabled:
cfg.discord.enabled = True
logger.debug("Discord auto-enabled: token present")
# Configure logging
logging.basicConfig(
level=getattr(logging, log_level.upper(), logging.INFO),
format="%(asctime)s [%(name)s] %(levelname)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger.info(f"Config: {CONFIG_PATH}")
# -------------------------------------------------------------------
# 1. Initialize Memory System
# -------------------------------------------------------------------
workspace_dir = os.path.expanduser(cfg.memory.workspace)
db_path = os.path.expanduser(cfg.memory.db_path)
try:
mem_config = MemoryConfig(
workspace_dir=workspace_dir,
db_path=db_path,
)
_memory = MemoryManager(mem_config)
logger.info(f"Memory system initialized: workspace={workspace_dir}")
stats = asyncio.run(_memory.sync())
logger.info(
f"Memory sync: {stats.get('files_indexed', 0)} files indexed, "
f"{stats.get('chunks_created', 0)} chunks"
)
except Exception as e:
logger.warning(f"Memory system init failed (continuing without): {e}")
_memory = None
# -------------------------------------------------------------------
# 2. Initialize Skills System
# -------------------------------------------------------------------
try:
_skills = SkillsManager(workspace_dir)
loaded = _skills.load_all()
logger.info(f"Skills system initialized: {len(loaded)} skill(s)")
except Exception as e:
logger.warning(f"Skills system init failed (continuing without): {e}")
_skills = None
# -------------------------------------------------------------------
# 2½. Write MCP config (before runtime sees it)
# -------------------------------------------------------------------
write_mcp_config(cfg.mcp, cfg.memory.workspace, _use_claude)
# -------------------------------------------------------------------
# 3. Initialize AI Runtime
# -------------------------------------------------------------------
runtime_label = "echo (test mode)"
if not args.test:
if _use_claude:
claude_config = ClaudeCodeConfig(
model=cfg.claude.model,
timeout_seconds=cfg.claude.timeout_seconds,
max_turns=cfg.claude.max_turns,
no_tools=cfg.claude.no_tools,
allowed_tools=cfg.claude.allowed_tools,
)
_runtime = ClaudeCodeRuntime(claude_config)
runtime_label = f"claude-code, model={claude_config.model or 'default'}"
else:
oc_config = OpenCodeConfig(
mode=RuntimeMode.SDK if cfg.runtime.mode == "sdk" else RuntimeMode.CLI,
server_url=cfg.runtime.server_url,
timeout_seconds=cfg.runtime.timeout_seconds,
model=cfg.runtime.model,
provider=cfg.runtime.provider,
workspace_dir=cfg.runtime.workspace,
format=cfg.runtime.format,
agent=cfg.runtime.agent,
attach_url=cfg.runtime.attach,
)
_runtime = OpenCodeRuntime(oc_config)
runtime_label = (
f"opencode/{oc_config.mode.value}, "
f"model={oc_config.model or 'default'}"
)
# -------------------------------------------------------------------
# 4. Initialize Scheduler
# -------------------------------------------------------------------
try:
_scheduler = Scheduler(callback=_on_scheduled_job)
_scheduler.start()
logger.info("Scheduler initialized")
except Exception as e:
logger.warning(f"Scheduler init failed (continuing without): {e}")
_scheduler = None
# -------------------------------------------------------------------
# 4b. Initialize Heartbeat System
# -------------------------------------------------------------------
heartbeat_count = 0
if _scheduler:
try:
_heartbeat = HeartbeatRunner(
scheduler=_scheduler,
ai_handler_fn=ai_handler,
send_fn=_send_to_channel,
config=cfg.heartbeat,
workspace_dir=workspace_dir,
)
heartbeat_count = _heartbeat.start()
logger.info(f"Heartbeat initialized: {heartbeat_count} task(s)")
except Exception as e:
logger.warning(f"Heartbeat init failed (continuing without): {e}")
_heartbeat = None
# -------------------------------------------------------------------
# 5. Initialize Subagent Manager
# -------------------------------------------------------------------
if _runtime:
try:
_subagent_mgr = SubagentManager(
runtime_factory=lambda: _make_runtime("subagent"),
send_fn=_send_to_channel,
)
logger.info("Subagent manager initialized")
except Exception as e:
logger.warning(f"Subagent manager init failed: {e}")
_subagent_mgr = None
# -------------------------------------------------------------------
# 5b. Initialize Hook System
# -------------------------------------------------------------------
hook_count = 0
if cfg.hooks.enabled:
try:
_hook_mgr = HookManager(workspace_dir=workspace_dir)
hooks_found = _hook_mgr.discover()
hook_count = len(hooks_found)
logger.info(f"Hook system initialized: {hook_count} hook(s)")
except Exception as e:
logger.warning(f"Hook system init failed (continuing without): {e}")
_hook_mgr = None
# -------------------------------------------------------------------
# 5c. Initialize Webhook Receiver
# -------------------------------------------------------------------
if cfg.webhooks.enabled:
try:
from webhooks.receiver import WebhookReceiver, WebhookConfig as WHConfig
wh_config = WHConfig(
enabled=cfg.webhooks.enabled,
port=cfg.webhooks.port,
host=cfg.webhooks.host,
token=cfg.webhooks.token,
)
_webhook_receiver = WebhookReceiver(
ai_handler_fn=ai_handler,
send_fn=_send_to_channel,
config=wh_config,
)
_webhook_receiver.start_async()
logger.info(
f"Webhook receiver started at "
f"http://{cfg.webhooks.host}:{cfg.webhooks.port}/hooks/"
)
except Exception as e:
logger.warning(f"Webhook receiver init failed (continuing without): {e}")
_webhook_receiver = None
# -------------------------------------------------------------------
# 6. Initialize Channel Adapters
# -------------------------------------------------------------------
# Choose the message handler
handler = echo_handler if args.test else ai_handler
# Slack adapter (enabled when tokens are present and not disabled)
if cfg.slack.enabled and cfg.slack.bot_token and cfg.slack.app_token:
try:
slack = SlackAdapter(
bot_token=cfg.slack.bot_token,
app_token=cfg.slack.app_token,
log_level=log_level,
)
slack.on_message(handler)
_adapters["slack"] = slack
logger.info("Slack adapter registered")
except Exception as e:
logger.error(f"Slack adapter failed to initialize: {e}")
elif cfg.slack.enabled:
logger.warning("Slack enabled but tokens not set — Slack adapter disabled")
# Telegram adapter (enabled via config or token auto-detection)
if cfg.telegram.enabled:
if cfg.telegram.bot_token:
try:
from adapters.telegram_adapter import TelegramAdapter
telegram = TelegramAdapter(bot_token=cfg.telegram.bot_token)
telegram.on_message(handler)
_adapters["telegram"] = telegram
logger.info("Telegram adapter registered")
except Exception as e:
logger.error(f"Telegram adapter failed to initialize: {e}")
else:
logger.warning(
"Telegram enabled but TELEGRAM_BOT_TOKEN not set. "
"Get a token from @BotFather on Telegram."
)
# Discord adapter (enabled via config or token auto-detection)
if cfg.discord.enabled:
if cfg.discord.bot_token:
try:
from adapters.discord_adapter import DiscordAdapter
discord_adapter = DiscordAdapter(
bot_token=cfg.discord.bot_token,
listen_channels=cfg.discord.listen_channels or None,
reply_to_mode=cfg.discord.reply_to_mode,
history_enabled=cfg.discord.history_enabled,
history_limit=cfg.discord.history_limit,
channel_overrides={
k: {"history_limit": v.history_limit, "history_enabled": v.history_enabled}
for k, v in cfg.discord.channel_overrides.items()
},
ack_reaction=cfg.discord.ack_reaction,
typing_indicator=cfg.discord.typing_indicator,
reaction_mode=cfg.discord.reaction_mode,
exec_approvals=cfg.discord.exec_approvals,
exec_approval_tools=cfg.discord.exec_approval_tools,
slash_commands=cfg.discord.slash_commands,
components_enabled=cfg.discord.components_enabled,
)
discord_adapter.on_message(handler)
_adapters["discord"] = discord_adapter
logger.info("Discord adapter registered")
except Exception as e:
logger.error(f"Discord adapter failed to initialize: {e}")
else:
logger.warning(
"Discord enabled but DISCORD_BOT_TOKEN not set. "
"Get a token from https://discord.com/developers/applications"
)
# WebChat adapter (enabled via config)
if cfg.webchat.enabled:
try:
from adapters.webchat_adapter import WebChatAdapter
webchat = WebChatAdapter(
host=cfg.webchat.host,
port=cfg.webchat.port,
)
webchat.on_message(handler)
_adapters["webchat"] = webchat
logger.info(f"WebChat adapter registered at http://{cfg.webchat.host}:{cfg.webchat.port}")
except Exception as e:
logger.error(f"WebChat adapter failed to initialize: {e}")
if not _adapters:
print("❌ No channel adapters initialized!")
print(" Configure adapters in ~/.aetheel/config.json")
print(" Set tokens in .env (SLACK_BOT_TOKEN, DISCORD_BOT_TOKEN, etc.)")
print(" Or enable webchat: {\"webchat\": {\"enabled\": true}}")
sys.exit(1)
# Start file watching for automatic memory re-indexing
if _memory:
_memory.start_watching()
# -------------------------------------------------------------------
# 7. Start Adapters
# -------------------------------------------------------------------
logger.info("=" * 60)
logger.info(" Aetheel Starting")
logger.info("=" * 60)
logger.info(f" Config: {CONFIG_PATH}")
logger.info(f" Runtime: {runtime_label}")
logger.info(f" Channels: {', '.join(_adapters.keys())}")
logger.info(f" Skills: {len(_skills.skills) if _skills else 0}")
logger.info(f" Scheduler: {'' if _scheduler else ''}")
logger.info(f" Heartbeat: {'' + str(heartbeat_count) + ' tasks' if _heartbeat else ''}")
logger.info(f" Subagents: {'' if _subagent_mgr else ''}")
logger.info(f" Hooks: {'' + str(hook_count) + ' hooks' if _hook_mgr else ''}")
logger.info(f" Webhooks: {'✅ port ' + str(cfg.webhooks.port) if _webhook_receiver else ''}")
logger.info("=" * 60)
try:
# Fire gateway:startup hook
if _hook_mgr:
_hook_mgr.trigger(HookEvent(type="gateway", action="startup"))
if len(_adapters) == 1:
# Single adapter — start it blocking
adapter = next(iter(_adapters.values()))
adapter.start()
else:
# Multiple adapters — start all but last async, last blocking
adapter_list = list(_adapters.values())
for adapter in adapter_list[:-1]:
adapter.start_async()
adapter_list[-1].start() # blocking
except KeyboardInterrupt:
logger.info("Shutting down...")
finally:
# Fire gateway:shutdown hook
if _hook_mgr:
_hook_mgr.trigger(HookEvent(type="gateway", action="shutdown"))
# Cleanup
if _webhook_receiver:
_webhook_receiver.stop()
for adapter in _adapters.values():
try:
adapter.stop()
except Exception:
pass
if _scheduler:
_scheduler.stop()
if _memory:
_memory.close()
logger.info("Aetheel stopped. Goodbye! 👋")
if __name__ == "__main__":
main()