Files
Aetheel/main.py
tanmay11k 6d73f74e0b feat: config-driven architecture, install wizard, live runtime switching, usage tracking, auto-failover
Major changes:
- Config-driven adapters: all channels (Slack, Discord, Telegram, WebChat, Webhooks) controlled via config.json with enabled flags and token auto-detection, no CLI flags required
- Runtime engine field: runtime.engine selects opencode/claude from config
- Interactive install script: 8-phase setup wizard with AI runtime detection/installation, token setup, identity file personalization (personality presets), aetheel CLI command, background service (launchd/systemd)
- Live runtime switching: /engine, /model, /provider commands hot-swap the AI runtime from chat without restart, changes persisted to config.json
- Usage tracking: per-request cost extraction from Claude Code JSON output, cumulative stats via /usage command
- Auto-failover: rate limit detection on both runtimes, automatic switch to other engine on quota errors with user notification
- Chat commands work without / prefix (Slack intercepts / in channels), commands: engine, model, provider, config, usage, reload, cron, subagents, status, help
- /config set for editing config.json from chat with dotted key notation
- Security audit saved to docs/security-audit.md
- Full command reference in docs/commands.md
- Future changes doc with NanoClaw agent teams analysis
- Logo added to README and WebChat UI
- README fully rewritten with all features documented
2026-02-18 01:07:12 -05:00

1522 lines
53 KiB
Python

#!/usr/bin/env python3
"""
Aetheel — Main Entry Point
============================
Starts the AI assistant with multi-channel adapters, memory, skills,
scheduled tasks, and subagent support.
Usage:
python main.py Start with Slack + AI handler
python main.py --telegram Also enable Telegram adapter
python main.py --discord Also enable Discord adapter
python main.py --claude Use Claude Code runtime
python main.py --test Echo handler for testing
python main.py --model anthropic/claude-sonnet-4-20250514
python main.py --log DEBUG Debug logging
"""
import argparse
import asyncio
import logging
import os
import re
import sys
import threading
from datetime import datetime
from dotenv import load_dotenv
# Load .env file (secrets only — config comes from ~/.aetheel/config.json)
load_dotenv()
from adapters.base import BaseAdapter, IncomingMessage
from adapters.slack_adapter import SlackAdapter
from agent.claude_runtime import ClaudeCodeConfig, ClaudeCodeRuntime
from agent.opencode_runtime import (
AgentResponse,
OpenCodeConfig,
OpenCodeRuntime,
RuntimeMode,
build_aetheel_system_prompt,
)
from agent.subagent import SubagentManager
from config import AetheelConfig, load_config, save_default_config, write_mcp_config, CONFIG_PATH
from heartbeat import HeartbeatRunner
from hooks import HookManager, HookEvent
from memory import MemoryManager
from memory.types import MemoryConfig
from scheduler import Scheduler
from scheduler.store import ScheduledJob
from skills import SkillsManager
logger = logging.getLogger("aetheel")
# Type alias for either runtime
AnyRuntime = OpenCodeRuntime | ClaudeCodeRuntime
# Global instances (initialized in main)
_runtime: AnyRuntime | None = None
_memory: MemoryManager | None = None
_skills: SkillsManager | None = None
_scheduler: Scheduler | None = None
_subagent_mgr: SubagentManager | None = None
_heartbeat: HeartbeatRunner | None = None
_hook_mgr: HookManager | None = None
_webhook_receiver = None # WebhookReceiver | None
_adapters: dict[str, BaseAdapter] = {} # source_name -> adapter
# Runtime config (stored for subagent factory)
_use_claude: bool = False
_cli_args: argparse.Namespace | None = None
# Usage tracking
_usage_stats: dict = {
"total_requests": 0,
"total_cost_usd": 0.0,
"total_duration_ms": 0,
"requests_by_engine": {"opencode": 0, "claude": 0},
"cost_by_engine": {"opencode": 0.0, "claude": 0.0},
"rate_limit_hits": 0,
"failovers": 0,
"last_rate_limit": None, # ISO timestamp
"session_start": datetime.now().isoformat(),
}
# Regex for parsing action tags from AI responses
_ACTION_RE = re.compile(r"\[ACTION:remind\|(\d+)\|(.+?)\]", re.DOTALL)
_CRON_RE = re.compile(r"\[ACTION:cron\|([\d\*/,\- ]+)\|(.+?)\]", re.DOTALL)
_SPAWN_RE = re.compile(r"\[ACTION:spawn\|(.+?)\]", re.DOTALL)
# ---------------------------------------------------------------------------
# Message Handlers
# ---------------------------------------------------------------------------
def echo_handler(msg: IncomingMessage) -> str:
"""Simple echo handler for testing."""
response_lines = [
f"👋 *Aetheel received your message!*",
"",
f"📝 *Text:* {msg.text}",
f"👤 *From:* {msg.user_name} (`{msg.user_id}`)",
f"📍 *Channel:* {msg.channel_name} (`{msg.channel_id}`)",
f"💬 *Source:* {msg.source}",
f"🧵 *ConvID:* `{msg.conversation_id[:15]}...`",
f"🕐 *Time:* {msg.timestamp.strftime('%Y-%m-%d %H:%M:%S UTC')}",
"",
f"_This is an echo response from the Aetheel test handler._",
]
return "\n".join(response_lines)
def _build_context(msg: IncomingMessage) -> str:
"""
Build full context to inject into the system prompt.
Combines:
- Identity files (SOUL.md, USER.md, MEMORY.md)
- Relevant memory search results
- Relevant skills for this message
- Available skills summary
"""
global _memory, _skills
sections: list[str] = []
# ── Identity: SOUL.md ──
if _memory:
soul = _memory.read_soul()
if soul:
sections.append(f"# Your Identity (SOUL.md)\n\n{soul}")
# ── User profile: USER.md ──
user = _memory.read_user()
if user:
sections.append(f"# About the User (USER.md)\n\n{user}")
# ── Long-term memory: MEMORY.md ──
ltm = _memory.read_long_term_memory()
if ltm:
sections.append(f"# Long-Term Memory (MEMORY.md)\n\n{ltm}")
# ── Relevant memory search results ──
try:
results = asyncio.run(_memory.search(msg.text, max_results=3, min_score=0.2))
if results:
snippets = []
for r in results:
if r.path in ("SOUL.md", "USER.md", "MEMORY.md"):
continue
snippets.append(
f"**{r.path}** (lines {r.start_line}-{r.end_line}, "
f"relevance {r.score:.0%}):\n{r.snippet[:500]}"
)
if snippets:
sections.append(
"# Relevant Memory Context\n\n"
+ "\n\n---\n\n".join(snippets)
)
except Exception as e:
logger.debug(f"Memory search failed: {e}")
# ── Skills context ──
if _skills:
# Inject matching skill instructions
skill_context = _skills.get_context(msg.text)
if skill_context:
sections.append(skill_context)
# Always show available skills summary
skills_summary = _skills.get_all_context()
if skills_summary:
sections.append(skills_summary)
return "\n\n---\n\n".join(sections)
def ai_handler(msg: IncomingMessage) -> str:
"""
AI-powered handler — the heart of Aetheel.
Flow:
Message → context (memory + skills) → system prompt → runtime.chat()
→ action tags → session log → response
"""
global _runtime, _memory, _scheduler
if _runtime is None:
return "⚠️ AI runtime not initialized. Please restart the service."
text_lower = msg.text.lower().strip()
# Built-in commands (bypass AI)
# Commands work with or without "/" prefix.
# In Slack channels, "/" is intercepted as a native slash command,
# so users should use the prefix-free form (e.g. "engine claude").
# In DMs and other adapters, both forms work.
cmd = text_lower.lstrip("/")
if cmd in ("status", "ping"):
return _format_status()
if cmd == "help":
return _format_help()
if cmd == "time":
return f"🕐 Server time: *{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}*"
if cmd in ("sessions",):
return _format_sessions()
# Cron management commands
if cmd.startswith("cron"):
return _handle_cron_command(cmd)
# Reload config and skills
if cmd in ("reload",):
cfg = load_config()
if _skills:
_skills.reload()
if _hook_mgr:
_hook_mgr.trigger(HookEvent(type="command", action="reload"))
return "🔄 Config and skills reloaded."
# List active subagents
if cmd in ("subagents",):
return _format_subagents()
# Runtime management commands
if cmd.startswith("engine") or cmd.startswith("runtime"):
return _handle_engine_command(msg.text.strip().lstrip("/"))
if cmd.startswith("model"):
return _handle_model_command(msg.text.strip().lstrip("/"))
if cmd.startswith("provider"):
return _handle_provider_command(msg.text.strip().lstrip("/"))
if cmd.startswith("config"):
return _handle_config_command(msg.text.strip().lstrip("/"))
if cmd.startswith("usage"):
return _handle_usage_command()
# Build context from memory + skills
context = _build_context(msg)
# Route to AI via runtime
system_prompt = build_aetheel_system_prompt(
user_name=msg.user_name,
channel_name=msg.channel_name,
is_dm=msg.is_dm,
extra_context=context,
)
response = _runtime.chat(
message=msg.text,
conversation_id=msg.conversation_id,
system_prompt=system_prompt,
)
# Track usage stats
_track_usage(response)
# Rate limit detection + auto-failover
if response.rate_limited:
failover_response = _handle_rate_limit(msg, system_prompt, response)
if failover_response is not None:
return failover_response
if not response.ok:
error_msg = response.error or "Unknown error"
logger.error(f"AI error: {error_msg}")
if "not found" in error_msg.lower() or "not installed" in error_msg.lower():
return (
"⚠️ AI CLI is not available.\n"
"Check the runtime installation docs."
)
if "timeout" in error_msg.lower():
return (
"⏳ The AI took too long to respond. "
"Try a shorter or simpler question."
)
return f"⚠️ AI error: {error_msg[:200]}"
logger.info(
f"🤖 AI response: {len(response.text)} chars, "
f"{response.duration_ms}ms"
f"{', $' + str(response.usage.get('cost_usd', 0)) if response.usage else ''}"
)
# Parse and execute action tags (reminders, cron, spawn)
reply_text = _process_action_tags(response.text, msg)
# Log conversation to memory session log
if _memory:
try:
channel = "dm" if msg.is_dm else msg.channel_name or msg.source
_memory.log_session(
f"**User ({msg.user_name}):** {msg.text}\n\n"
f"**Aetheel:** {reply_text}",
channel=channel,
)
except Exception as e:
logger.debug(f"Session logging failed: {e}")
return reply_text
# ---------------------------------------------------------------------------
# Action Tag Processing
# ---------------------------------------------------------------------------
def _process_action_tags(text: str, msg: IncomingMessage) -> str:
"""
Parse and execute action tags from the AI response.
Supports:
[ACTION:remind|<minutes>|<message>] → one-shot reminder
[ACTION:cron|<cron_expr>|<prompt>] → recurring cron job
[ACTION:spawn|<task description>] → background subagent
"""
cleaned = text
# ── Remind tags (one-shot) ──
for match in _ACTION_RE.finditer(text):
minutes_str, reminder_msg = match.group(1), match.group(2)
try:
minutes = int(minutes_str)
if _scheduler:
_scheduler.add_once(
delay_minutes=minutes,
prompt=reminder_msg.strip(),
channel_id=msg.channel_id,
channel_type=msg.source,
thread_id=msg.raw_event.get("thread_id"),
user_name=msg.user_name,
)
logger.info(
f"⏰ Reminder scheduled: '{reminder_msg.strip()[:50]}' "
f"in {minutes} min for {msg.source}/{msg.channel_name}"
)
except Exception as e:
logger.warning(f"Failed to schedule reminder: {e}")
cleaned = cleaned.replace(match.group(0), "").strip()
# ── Cron tags (recurring) ──
for match in _CRON_RE.finditer(text):
cron_expr, cron_prompt = match.group(1).strip(), match.group(2).strip()
try:
if _scheduler:
job_id = _scheduler.add_cron(
cron_expr=cron_expr,
prompt=cron_prompt,
channel_id=msg.channel_id,
channel_type=msg.source,
thread_id=msg.raw_event.get("thread_id"),
user_name=msg.user_name,
)
logger.info(
f"🔄 Cron scheduled: '{cron_prompt[:50]}' ({cron_expr}) "
f"job_id={job_id}"
)
except Exception as e:
logger.warning(f"Failed to schedule cron job: {e}")
cleaned = cleaned.replace(match.group(0), "").strip()
# ── Spawn tags (subagent) ──
for match in _SPAWN_RE.finditer(text):
spawn_task = match.group(1).strip()
try:
if _subagent_mgr:
task_id = _subagent_mgr.spawn(
task=spawn_task,
channel_id=msg.channel_id,
channel_type=msg.source,
thread_id=msg.raw_event.get("thread_id"),
user_name=msg.user_name,
)
logger.info(
f"🚀 Subagent spawned: '{spawn_task[:50]}' "
f"task_id={task_id}"
)
except Exception as e:
logger.warning(f"Failed to spawn subagent: {e}")
cleaned = cleaned.replace(match.group(0), "").strip()
return cleaned
# ---------------------------------------------------------------------------
# Cron Management Commands
# ---------------------------------------------------------------------------
def _handle_cron_command(text: str) -> str:
"""Handle /cron subcommands."""
global _scheduler
if not _scheduler:
return "⚠️ Scheduler not initialized."
parts = text.strip().split(maxsplit=2)
if len(parts) < 2 or parts[1] == "list":
jobs = _scheduler.list_jobs()
if not jobs:
return "📋 No scheduled jobs."
lines = ["📋 *Scheduled Jobs:*\n"]
for job in jobs:
kind = f"🔄 `{job.cron_expr}`" if job.is_recurring else "⏰ one-shot"
lines.append(
f"• `{job.id}` — {kind}{job.prompt[:60]}"
)
return "\n".join(lines)
if parts[1] == "remove" and len(parts) >= 3:
job_id = parts[2].strip()
if _scheduler.remove(job_id):
return f"✅ Job `{job_id}` removed."
return f"⚠️ Job `{job_id}` not found."
return (
"Usage: `cron list` or `cron remove <id>`"
)
# ---------------------------------------------------------------------------
# Runtime / Model / Provider Management Commands
# ---------------------------------------------------------------------------
def _handle_engine_command(text: str) -> str:
"""
Handle /engine (or /runtime) commands.
/engine Show current engine
/engine opencode Switch to OpenCode
/engine claude Switch to Claude Code
"""
global _runtime, _use_claude
parts = text.strip().split(maxsplit=1)
if len(parts) < 2:
engine = "claude" if _use_claude else "opencode"
status = _runtime.get_status() if _runtime else {}
model = status.get("model", "default")
provider = status.get("provider", "auto")
return (
f"🧠 *Current Engine:* `{engine}`\n"
f"• *Model:* `{model}`\n"
f"• *Provider:* `{provider}`\n"
f"\n"
f"Switch with: `/engine opencode` or `/engine claude`"
)
target = parts[1].strip().lower()
if target not in ("opencode", "claude"):
return "Usage: `/engine opencode` or `/engine claude`"
want_claude = target == "claude"
if want_claude == _use_claude:
return f"Already using `{target}`."
# Hot-swap the runtime
try:
_use_claude = want_claude
cfg = load_config()
cfg.runtime.engine = target
if want_claude:
new_config = ClaudeCodeConfig(
model=cfg.claude.model,
timeout_seconds=cfg.claude.timeout_seconds,
max_turns=cfg.claude.max_turns,
no_tools=cfg.claude.no_tools,
allowed_tools=cfg.claude.allowed_tools,
)
_runtime = ClaudeCodeRuntime(new_config)
else:
new_config = OpenCodeConfig(
mode=RuntimeMode.SDK if cfg.runtime.mode == "sdk" else RuntimeMode.CLI,
server_url=cfg.runtime.server_url,
timeout_seconds=cfg.runtime.timeout_seconds,
model=cfg.runtime.model,
provider=cfg.runtime.provider,
workspace_dir=cfg.runtime.workspace,
format=cfg.runtime.format,
)
_runtime = OpenCodeRuntime(new_config)
# Persist to config.json
_update_config_file({"runtime": {"engine": target}})
status = _runtime.get_status()
return (
f"✅ Switched to `{target}`\n"
f"• *Model:* `{status.get('model', 'default')}`\n"
f"• *Provider:* `{status.get('provider', 'auto')}`"
)
except Exception as e:
logger.error(f"Engine switch failed: {e}", exc_info=True)
return f"⚠️ Failed to switch engine: {e}"
def _handle_model_command(text: str) -> str:
"""
Handle /model commands.
/model Show current model
/model <name> Switch model (hot-swap)
/model anthropic/claude-sonnet-4-20250514
/model openai/gpt-4o
"""
global _runtime, _use_claude
parts = text.strip().split(maxsplit=1)
if len(parts) < 2:
status = _runtime.get_status() if _runtime else {}
return (
f"🧠 *Current Model:* `{status.get('model', 'default')}`\n"
f"• *Engine:* `{'claude' if _use_claude else 'opencode'}`\n"
f"• *Provider:* `{status.get('provider', 'auto')}`\n"
f"\n"
f"Switch with: `/model <model-name>`\n"
f"Examples:\n"
f"• `/model anthropic/claude-sonnet-4-20250514`\n"
f"• `/model openai/gpt-4o`\n"
f"• `/model google/gemini-2.5-pro`"
)
new_model = parts[1].strip()
if not new_model:
return "Usage: `/model <model-name>`"
try:
cfg = load_config()
if _use_claude:
cfg.claude.model = new_model
new_config = ClaudeCodeConfig(
model=new_model,
timeout_seconds=cfg.claude.timeout_seconds,
max_turns=cfg.claude.max_turns,
no_tools=cfg.claude.no_tools,
allowed_tools=cfg.claude.allowed_tools,
)
_runtime = ClaudeCodeRuntime(new_config)
_update_config_file({"claude": {"model": new_model}})
else:
cfg.runtime.model = new_model
new_config = OpenCodeConfig(
mode=RuntimeMode.SDK if cfg.runtime.mode == "sdk" else RuntimeMode.CLI,
server_url=cfg.runtime.server_url,
timeout_seconds=cfg.runtime.timeout_seconds,
model=new_model,
provider=cfg.runtime.provider,
workspace_dir=cfg.runtime.workspace,
format=cfg.runtime.format,
)
_runtime = OpenCodeRuntime(new_config)
_update_config_file({"runtime": {"model": new_model}})
return f"✅ Model switched to `{new_model}`"
except Exception as e:
logger.error(f"Model switch failed: {e}", exc_info=True)
return f"⚠️ Failed to switch model: {e}"
def _handle_provider_command(text: str) -> str:
"""
Handle /provider commands (OpenCode only).
/provider Show current provider
/provider anthropic Switch provider
/provider openai Switch provider
"""
global _runtime, _use_claude
if _use_claude:
return "Provider selection is only available with the OpenCode engine. Claude Code always uses Anthropic."
parts = text.strip().split(maxsplit=1)
if len(parts) < 2:
status = _runtime.get_status() if _runtime else {}
return (
f"🔌 *Current Provider:* `{status.get('provider', 'auto')}`\n"
f"\n"
f"Switch with: `/provider <name>`\n"
f"Examples: `anthropic`, `openai`, `google`, `auto`"
)
new_provider = parts[1].strip().lower()
try:
cfg = load_config()
cfg.runtime.provider = new_provider if new_provider != "auto" else None
new_config = OpenCodeConfig(
mode=RuntimeMode.SDK if cfg.runtime.mode == "sdk" else RuntimeMode.CLI,
server_url=cfg.runtime.server_url,
timeout_seconds=cfg.runtime.timeout_seconds,
model=cfg.runtime.model,
provider=new_provider if new_provider != "auto" else None,
workspace_dir=cfg.runtime.workspace,
format=cfg.runtime.format,
)
_runtime = OpenCodeRuntime(new_config)
_update_config_file({"runtime": {"provider": new_provider}})
return f"✅ Provider switched to `{new_provider}`"
except Exception as e:
logger.error(f"Provider switch failed: {e}", exc_info=True)
return f"⚠️ Failed to switch provider: {e}"
def _handle_config_command(text: str) -> str:
"""
Handle /config commands — view and edit config from chat.
/config Show key settings
/config show Show full config
/config set <key> <val> Set a config value
"""
import json as _json
parts = text.strip().split(maxsplit=2)
subcommand = parts[1] if len(parts) > 1 else "summary"
if subcommand == "show":
try:
with open(CONFIG_PATH, "r", encoding="utf-8") as f:
data = _json.load(f)
formatted = _json.dumps(data, indent=2)
if len(formatted) > 3000:
formatted = formatted[:3000] + "\n... (truncated)"
return f"```\n{formatted}\n```"
except Exception as e:
return f"⚠️ Failed to read config: {e}"
elif subcommand == "set" and len(parts) >= 3:
rest = parts[2].strip()
kv_parts = rest.split(maxsplit=1)
if len(kv_parts) < 2:
return "Usage: `/config set runtime.model anthropic/claude-sonnet-4-20250514`"
key_path = kv_parts[0]
value_str = kv_parts[1]
# Parse value (handle booleans, numbers, null, strings)
try:
value = _json.loads(value_str)
except _json.JSONDecodeError:
value = value_str
keys = key_path.split(".")
if len(keys) < 2:
return "Use dotted notation: `/config set runtime.model <value>`"
update = {}
current = update
for k in keys[:-1]:
current[k] = {}
current = current[k]
current[keys[-1]] = value
try:
_update_config_file(update)
return f"✅ Set `{key_path}` = `{value_str}`\nRun `/reload` to apply changes."
except Exception as e:
return f"⚠️ Failed to update config: {e}"
else:
cfg = load_config()
engine = cfg.runtime.engine
model = cfg.claude.model if engine == "claude" else cfg.runtime.model
provider = cfg.runtime.provider if engine == "opencode" else "anthropic"
lines = [
"⚙️ *Configuration Summary*",
"",
f"• *Engine:* `{engine}`",
f"• *Model:* `{model or 'default'}`",
f"• *Provider:* `{provider or 'auto'}`",
f"• *Mode:* `{cfg.runtime.mode}`",
f"• *Timeout:* `{cfg.runtime.timeout_seconds}s`",
"",
f"• *Slack:* {'' if cfg.slack.enabled else ''}",
f"• *Discord:* {'' if cfg.discord.enabled else ''}",
f"• *Telegram:* {'' if cfg.telegram.enabled else ''}",
f"• *WebChat:* {'' if cfg.webchat.enabled else ''}",
f"• *Webhooks:* {'' if cfg.webhooks.enabled else ''}",
"",
"Use `/config show` for full config, `/config set <key> <value>` to edit.",
"Use `/engine`, `/model`, `/provider` for live switching.",
]
return "\n".join(lines)
def _update_config_file(updates: dict) -> None:
"""
Merge updates into ~/.aetheel/config.json.
Deep merge so partial updates don't clobber existing keys.
"""
import json as _json
data = {}
if os.path.isfile(CONFIG_PATH):
with open(CONFIG_PATH, "r", encoding="utf-8") as f:
data = _json.load(f)
def _deep_merge(base: dict, overlay: dict) -> dict:
for key, value in overlay.items():
if key in base and isinstance(base[key], dict) and isinstance(value, dict):
_deep_merge(base[key], value)
else:
base[key] = value
return base
_deep_merge(data, updates)
with open(CONFIG_PATH, "w", encoding="utf-8") as f:
_json.dump(data, f, indent=2)
logger.info(f"Config updated: {list(updates.keys())}")
# ---------------------------------------------------------------------------
# Usage Tracking, Rate Limit Failover, Notifications
# ---------------------------------------------------------------------------
def _track_usage(response: AgentResponse) -> None:
"""Track usage stats from an AI response."""
global _usage_stats
engine = "claude" if _use_claude else "opencode"
_usage_stats["total_requests"] += 1
_usage_stats["requests_by_engine"][engine] = (
_usage_stats["requests_by_engine"].get(engine, 0) + 1
)
_usage_stats["total_duration_ms"] += response.duration_ms
if response.usage:
cost = response.usage.get("cost_usd", 0) or 0
_usage_stats["total_cost_usd"] += cost
_usage_stats["cost_by_engine"][engine] = (
_usage_stats["cost_by_engine"].get(engine, 0) + cost
)
if response.rate_limited:
_usage_stats["rate_limit_hits"] += 1
_usage_stats["last_rate_limit"] = datetime.now().isoformat()
def _handle_rate_limit(
msg: IncomingMessage,
system_prompt: str,
original_response: AgentResponse,
) -> str | None:
"""
Handle a rate-limited response. Attempts auto-failover to the other
engine and notifies the user.
Returns the failover response text, or None if failover isn't possible.
"""
global _runtime, _use_claude, _usage_stats
current_engine = "claude" if _use_claude else "opencode"
other_engine = "opencode" if _use_claude else "claude"
logger.warning(
f"Rate limit hit on {current_engine}: {original_response.error}"
)
# Notify the user about the rate limit
notify_text = (
f"⚠️ *Rate limit reached on {current_engine}*\n"
f"{original_response.error or 'Usage limit exceeded.'}"
)
# Try auto-failover to the other engine
cfg = load_config()
failover_runtime = None
try:
if _use_claude:
# Failover: Claude → OpenCode
failover_runtime = OpenCodeRuntime(OpenCodeConfig(
mode=RuntimeMode.SDK if cfg.runtime.mode == "sdk" else RuntimeMode.CLI,
server_url=cfg.runtime.server_url,
timeout_seconds=cfg.runtime.timeout_seconds,
model=cfg.runtime.model,
provider=cfg.runtime.provider,
workspace_dir=cfg.runtime.workspace,
format=cfg.runtime.format,
))
else:
# Failover: OpenCode → Claude
failover_runtime = ClaudeCodeRuntime(ClaudeCodeConfig(
model=cfg.claude.model,
timeout_seconds=cfg.claude.timeout_seconds,
max_turns=cfg.claude.max_turns,
no_tools=cfg.claude.no_tools,
allowed_tools=cfg.claude.allowed_tools,
))
except Exception as e:
logger.warning(f"Failover engine ({other_engine}) not available: {e}")
return f"{notify_text}\n\n{other_engine} is not available for failover."
# Attempt the request on the failover engine
logger.info(f"Attempting failover: {current_engine}{other_engine}")
try:
failover_response = failover_runtime.chat(
message=msg.text,
conversation_id=msg.conversation_id,
system_prompt=system_prompt,
)
_track_usage(failover_response)
if failover_response.ok:
# Failover succeeded — switch the active runtime
_runtime = failover_runtime
_use_claude = not _use_claude
_usage_stats["failovers"] += 1
engine_name = "claude" if _use_claude else "opencode"
logger.info(f"Failover successful → now using {engine_name}")
# Process action tags on the failover response
reply_text = _process_action_tags(failover_response.text, msg)
return (
f"{notify_text}\n"
f"🔄 Auto-switched to *{other_engine}*.\n\n"
f"{reply_text}"
)
else:
# Failover also failed
return (
f"{notify_text}\n"
f"Failover to {other_engine} also failed: "
f"{failover_response.error or 'Unknown error'}"
)
except Exception as e:
logger.error(f"Failover request failed: {e}", exc_info=True)
return f"{notify_text}\nFailover to {other_engine} failed: {e}"
def _handle_usage_command() -> str:
"""
Handle the `usage` command — show LLM usage stats.
"""
global _usage_stats
total = _usage_stats["total_requests"]
cost = _usage_stats["total_cost_usd"]
duration = _usage_stats["total_duration_ms"]
rate_limits = _usage_stats["rate_limit_hits"]
failovers = _usage_stats["failovers"]
started = _usage_stats["session_start"]
lines = [
"📊 *Usage Stats* (since last restart)",
"",
f"• *Total Requests:* {total}",
f"• *Total Cost:* ${cost:.4f}",
f"• *Total Duration:* {duration / 1000:.1f}s",
f"• *Rate Limit Hits:* {rate_limits}",
f"• *Auto-Failovers:* {failovers}",
f"• *Session Start:* {started}",
"",
"*By Engine:*",
]
for engine in ("opencode", "claude"):
reqs = _usage_stats["requests_by_engine"].get(engine, 0)
eng_cost = _usage_stats["cost_by_engine"].get(engine, 0)
if reqs > 0:
lines.append(f"• *{engine}:* {reqs} requests, ${eng_cost:.4f}")
if _usage_stats["last_rate_limit"]:
lines.append(f"\n⚠️ Last rate limit: {_usage_stats['last_rate_limit']}")
engine = "claude" if _use_claude else "opencode"
status = _runtime.get_status() if _runtime else {}
lines.extend([
"",
f"*Active:* `{engine}` / `{status.get('model', 'default')}`",
])
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Scheduler Callback
# ---------------------------------------------------------------------------
def _on_scheduled_job(job: ScheduledJob) -> None:
"""
Called by the scheduler when a job fires.
Creates a synthetic IncomingMessage and routes it through ai_handler,
then sends the response to the right channel.
"""
logger.info(f"🔔 Scheduled job firing: {job.id}'{job.prompt[:50]}'")
# Build a synthetic message
msg = IncomingMessage(
text=job.prompt,
user_id="system",
user_name=job.user_name or "Scheduler",
channel_id=job.channel_id,
channel_name=f"scheduled-{job.id}",
conversation_id=f"cron-{job.id}",
source=job.channel_type,
is_dm=True,
raw_event={"thread_id": job.thread_id},
)
# Route through the AI handler
try:
response = ai_handler(msg)
if response:
_send_to_channel(
channel_id=job.channel_id,
text=response,
thread_id=job.thread_id,
channel_type=job.channel_type,
)
except Exception as e:
logger.error(f"Scheduled job {job.id} handler failed: {e}", exc_info=True)
# ---------------------------------------------------------------------------
# Multi-Channel Send
# ---------------------------------------------------------------------------
def _send_to_channel(
channel_id: str,
text: str,
thread_id: str | None,
channel_type: str,
) -> None:
"""
Send a message to a specific channel via the right adapter.
Used by the scheduler and subagent manager to route responses
back to the correct platform.
"""
adapter = _adapters.get(channel_type)
if adapter:
adapter.send_message(
channel_id=channel_id,
text=text,
thread_id=thread_id,
)
else:
# Fallback: try the first available adapter
for a in _adapters.values():
a.send_message(channel_id=channel_id, text=text, thread_id=thread_id)
break
else:
logger.warning(
f"No adapter for '{channel_type}' — cannot send message"
)
# ---------------------------------------------------------------------------
# Runtime Factory (for subagents)
# ---------------------------------------------------------------------------
def _make_runtime() -> AnyRuntime:
"""Create a fresh runtime instance (used by subagent manager)."""
global _use_claude, _cli_args
cfg = load_config()
if _cli_args and _cli_args.model:
cfg.runtime.model = _cli_args.model
cfg.claude.model = _cli_args.model
if _use_claude:
config = ClaudeCodeConfig(
model=cfg.claude.model,
timeout_seconds=cfg.claude.timeout_seconds,
max_turns=cfg.claude.max_turns,
no_tools=cfg.claude.no_tools,
allowed_tools=cfg.claude.allowed_tools,
)
return ClaudeCodeRuntime(config)
else:
config = OpenCodeConfig(
mode=RuntimeMode.SDK if cfg.runtime.mode == "sdk" else RuntimeMode.CLI,
server_url=cfg.runtime.server_url,
timeout_seconds=cfg.runtime.timeout_seconds,
model=cfg.runtime.model,
provider=cfg.runtime.provider,
workspace_dir=cfg.runtime.workspace,
format=cfg.runtime.format,
)
return OpenCodeRuntime(config)
# ---------------------------------------------------------------------------
# Formatting Helpers
# ---------------------------------------------------------------------------
def _format_status() -> str:
"""Format the /status response with runtime info."""
global _runtime, _scheduler, _skills, _subagent_mgr
lines = [
"🟢 *Aetheel is online*",
"",
]
if _runtime:
status = _runtime.get_status()
engine = "claude" if _use_claude else "opencode"
lines.extend([
f"• *Engine:* {engine}",
f"• *Mode:* {status['mode']}",
f"• *Model:* {status['model']}",
f"• *Provider:* {status['provider']}",
f"• *Active Sessions:* {status['active_sessions']}",
f"• *Live Sessions:* {status.get('live_sessions', 0)}",
])
# Adapter status
if _adapters:
adapter_names = ", ".join(_adapters.keys())
lines.append(f"• *Channels:* {adapter_names}")
# Skills status
if _skills:
skill_count = len(_skills.skills)
lines.append(f"• *Skills Loaded:* {skill_count}")
# Scheduler status
if _scheduler:
jobs = _scheduler.list_jobs()
lines.append(f"• *Scheduled Jobs:* {len(jobs)}")
# Subagents status
if _subagent_mgr:
active = _subagent_mgr.list_active()
lines.append(f"• *Active Subagents:* {len(active)}")
lines.extend([
"",
f"• *Time:* {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
])
return "\n".join(lines)
def _format_help() -> str:
"""Format the /help response."""
return (
"🦾 *Aetheel — AI-Powered Assistant*\n"
"\n"
"*Commands:* (type as a regular message, no `/` needed)\n"
"\n"
"*General:*\n"
"• `status` — Bot and runtime status\n"
"• `help` — This help message\n"
"• `time` — Server time\n"
"• `sessions` — Active session count\n"
"• `reload` — Reload config and skills\n"
"\n"
"*Runtime:*\n"
"• `engine` — Show/switch AI engine (opencode or claude)\n"
"• `model` — Show/switch AI model\n"
"• `provider` — Show/switch provider (opencode only)\n"
"• `usage` — Show LLM usage stats and costs\n"
"\n"
"*Config:*\n"
"• `config` — View config summary\n"
"• `config show` — View full config.json\n"
"• `config set <key> <value>` — Edit a config value\n"
"\n"
"*Scheduler:*\n"
"• `cron list` — List scheduled jobs\n"
"• `cron remove <id>` — Remove a scheduled job\n"
"• `subagents` — List active background tasks\n"
"\n"
"*AI Chat:*\n"
"• Send any other message and the AI will respond\n"
"• Each thread maintains its own conversation\n"
"• DMs work too — just message me directly\n"
)
def _format_sessions() -> str:
"""Format session info."""
global _runtime
if _runtime:
count = _runtime.get_status()["active_sessions"]
cleaned = _runtime.cleanup_sessions()
return (
f"🧵 *Active Sessions:* {count}\n"
f"🧹 *Cleaned up:* {cleaned} stale sessions"
)
return "⚠️ Runtime not initialized."
def _format_subagents() -> str:
"""Format active subagent tasks."""
global _subagent_mgr
if not _subagent_mgr:
return "⚠️ Subagent manager not initialized."
active = _subagent_mgr.list_active()
if not active:
return "📋 No active subagents."
lines = ["🤖 *Active Subagents:*\n"]
for task in active:
lines.append(f"• `{task.id}` — {task.status}{task.task[:60]}")
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
global _runtime, _memory, _skills, _scheduler, _subagent_mgr, _heartbeat
global _hook_mgr, _webhook_receiver
global _adapters, _use_claude, _cli_args
parser = argparse.ArgumentParser(
description="Aetheel — AI-Powered Personal Assistant",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""\
Examples:
python main.py Start with config-driven adapters
python main.py --claude Override: use Claude Code runtime
python main.py --test Echo-only handler (no AI)
python main.py --model anthropic/claude-sonnet-4-20250514
python main.py --log DEBUG Debug logging
All adapters and features are controlled via ~/.aetheel/config.json.
CLI flags are optional overrides.
""",
)
parser.add_argument("--test", action="store_true", help="Use echo handler for testing")
parser.add_argument("--claude", action="store_true", help="Override: use Claude Code runtime")
parser.add_argument("--cli", action="store_true", help="Override: force CLI mode (OpenCode)")
parser.add_argument("--sdk", action="store_true", help="Override: force SDK mode (OpenCode)")
parser.add_argument("--telegram", action="store_true", help="Override: enable Telegram adapter")
parser.add_argument("--discord", action="store_true", help="Override: enable Discord adapter")
parser.add_argument("--webchat", action="store_true", help="Override: enable WebChat adapter")
parser.add_argument("--model", default=None, help="Override: model to use")
parser.add_argument(
"--log",
default=os.environ.get("LOG_LEVEL", "INFO"),
help="Log level (DEBUG, INFO, WARNING, ERROR)",
)
args = parser.parse_args()
_cli_args = args
# -------------------------------------------------------------------
# 0. Load Configuration (config.json + .env + CLI overrides)
# -------------------------------------------------------------------
save_default_config() # Create ~/.aetheel/config.json if missing
cfg = load_config()
# CLI flags override config (flags are optional overrides, config is primary)
if args.claude:
cfg.runtime.engine = "claude"
_use_claude = cfg.runtime.engine == "claude"
log_level = args.log if args.log != "INFO" else cfg.log_level
if args.model:
cfg.runtime.model = args.model
cfg.claude.model = args.model
if args.cli:
cfg.runtime.mode = "cli"
elif args.sdk:
cfg.runtime.mode = "sdk"
# CLI flags can enable adapters (but config is the primary source)
if args.telegram:
cfg.telegram.enabled = True
if args.discord:
cfg.discord.enabled = True
if args.webchat:
cfg.webchat.enabled = True
# Auto-enable adapters when tokens are present (even if not explicitly enabled)
if cfg.telegram.bot_token and not cfg.telegram.enabled:
cfg.telegram.enabled = True
logger.debug("Telegram auto-enabled: token present")
if cfg.discord.bot_token and not cfg.discord.enabled:
cfg.discord.enabled = True
logger.debug("Discord auto-enabled: token present")
# Configure logging
logging.basicConfig(
level=getattr(logging, log_level.upper(), logging.INFO),
format="%(asctime)s [%(name)s] %(levelname)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger.info(f"Config: {CONFIG_PATH}")
# -------------------------------------------------------------------
# 1. Initialize Memory System
# -------------------------------------------------------------------
workspace_dir = os.path.expanduser(cfg.memory.workspace)
db_path = os.path.expanduser(cfg.memory.db_path)
try:
mem_config = MemoryConfig(
workspace_dir=workspace_dir,
db_path=db_path,
)
_memory = MemoryManager(mem_config)
logger.info(f"Memory system initialized: workspace={workspace_dir}")
stats = asyncio.run(_memory.sync())
logger.info(
f"Memory sync: {stats.get('files_indexed', 0)} files indexed, "
f"{stats.get('chunks_created', 0)} chunks"
)
except Exception as e:
logger.warning(f"Memory system init failed (continuing without): {e}")
_memory = None
# -------------------------------------------------------------------
# 2. Initialize Skills System
# -------------------------------------------------------------------
try:
_skills = SkillsManager(workspace_dir)
loaded = _skills.load_all()
logger.info(f"Skills system initialized: {len(loaded)} skill(s)")
except Exception as e:
logger.warning(f"Skills system init failed (continuing without): {e}")
_skills = None
# -------------------------------------------------------------------
# 2½. Write MCP config (before runtime sees it)
# -------------------------------------------------------------------
write_mcp_config(cfg.mcp, cfg.memory.workspace, _use_claude)
# -------------------------------------------------------------------
# 3. Initialize AI Runtime
# -------------------------------------------------------------------
runtime_label = "echo (test mode)"
if not args.test:
if _use_claude:
claude_config = ClaudeCodeConfig(
model=cfg.claude.model,
timeout_seconds=cfg.claude.timeout_seconds,
max_turns=cfg.claude.max_turns,
no_tools=cfg.claude.no_tools,
allowed_tools=cfg.claude.allowed_tools,
)
_runtime = ClaudeCodeRuntime(claude_config)
runtime_label = f"claude-code, model={claude_config.model or 'default'}"
else:
oc_config = OpenCodeConfig(
mode=RuntimeMode.SDK if cfg.runtime.mode == "sdk" else RuntimeMode.CLI,
server_url=cfg.runtime.server_url,
timeout_seconds=cfg.runtime.timeout_seconds,
model=cfg.runtime.model,
provider=cfg.runtime.provider,
workspace_dir=cfg.runtime.workspace,
format=cfg.runtime.format,
)
_runtime = OpenCodeRuntime(oc_config)
runtime_label = (
f"opencode/{oc_config.mode.value}, "
f"model={oc_config.model or 'default'}"
)
# -------------------------------------------------------------------
# 4. Initialize Scheduler
# -------------------------------------------------------------------
try:
_scheduler = Scheduler(callback=_on_scheduled_job)
_scheduler.start()
logger.info("Scheduler initialized")
except Exception as e:
logger.warning(f"Scheduler init failed (continuing without): {e}")
_scheduler = None
# -------------------------------------------------------------------
# 4b. Initialize Heartbeat System
# -------------------------------------------------------------------
heartbeat_count = 0
if _scheduler:
try:
_heartbeat = HeartbeatRunner(
scheduler=_scheduler,
ai_handler_fn=ai_handler,
send_fn=_send_to_channel,
config=cfg.heartbeat,
workspace_dir=workspace_dir,
)
heartbeat_count = _heartbeat.start()
logger.info(f"Heartbeat initialized: {heartbeat_count} task(s)")
except Exception as e:
logger.warning(f"Heartbeat init failed (continuing without): {e}")
_heartbeat = None
# -------------------------------------------------------------------
# 5. Initialize Subagent Manager
# -------------------------------------------------------------------
if _runtime:
try:
_subagent_mgr = SubagentManager(
runtime_factory=_make_runtime,
send_fn=_send_to_channel,
)
logger.info("Subagent manager initialized")
except Exception as e:
logger.warning(f"Subagent manager init failed: {e}")
_subagent_mgr = None
# -------------------------------------------------------------------
# 5b. Initialize Hook System
# -------------------------------------------------------------------
hook_count = 0
if cfg.hooks.enabled:
try:
_hook_mgr = HookManager(workspace_dir=workspace_dir)
hooks_found = _hook_mgr.discover()
hook_count = len(hooks_found)
logger.info(f"Hook system initialized: {hook_count} hook(s)")
except Exception as e:
logger.warning(f"Hook system init failed (continuing without): {e}")
_hook_mgr = None
# -------------------------------------------------------------------
# 5c. Initialize Webhook Receiver
# -------------------------------------------------------------------
if cfg.webhooks.enabled:
try:
from webhooks.receiver import WebhookReceiver, WebhookConfig as WHConfig
wh_config = WHConfig(
enabled=cfg.webhooks.enabled,
port=cfg.webhooks.port,
host=cfg.webhooks.host,
token=cfg.webhooks.token,
)
_webhook_receiver = WebhookReceiver(
ai_handler_fn=ai_handler,
send_fn=_send_to_channel,
config=wh_config,
)
_webhook_receiver.start_async()
logger.info(
f"Webhook receiver started at "
f"http://{cfg.webhooks.host}:{cfg.webhooks.port}/hooks/"
)
except Exception as e:
logger.warning(f"Webhook receiver init failed (continuing without): {e}")
_webhook_receiver = None
# -------------------------------------------------------------------
# 6. Initialize Channel Adapters
# -------------------------------------------------------------------
# Choose the message handler
handler = echo_handler if args.test else ai_handler
# Slack adapter (enabled when tokens are present and not disabled)
if cfg.slack.enabled and cfg.slack.bot_token and cfg.slack.app_token:
try:
slack = SlackAdapter(
bot_token=cfg.slack.bot_token,
app_token=cfg.slack.app_token,
log_level=log_level,
)
slack.on_message(handler)
_adapters["slack"] = slack
logger.info("Slack adapter registered")
except Exception as e:
logger.error(f"Slack adapter failed to initialize: {e}")
elif cfg.slack.enabled:
logger.warning("Slack enabled but tokens not set — Slack adapter disabled")
# Telegram adapter (enabled via config or token auto-detection)
if cfg.telegram.enabled:
if cfg.telegram.bot_token:
try:
from adapters.telegram_adapter import TelegramAdapter
telegram = TelegramAdapter(bot_token=cfg.telegram.bot_token)
telegram.on_message(handler)
_adapters["telegram"] = telegram
logger.info("Telegram adapter registered")
except Exception as e:
logger.error(f"Telegram adapter failed to initialize: {e}")
else:
logger.warning(
"Telegram enabled but TELEGRAM_BOT_TOKEN not set. "
"Get a token from @BotFather on Telegram."
)
# Discord adapter (enabled via config or token auto-detection)
if cfg.discord.enabled:
if cfg.discord.bot_token:
try:
from adapters.discord_adapter import DiscordAdapter
discord_adapter = DiscordAdapter(
bot_token=cfg.discord.bot_token,
listen_channels=cfg.discord.listen_channels or None,
)
discord_adapter.on_message(handler)
_adapters["discord"] = discord_adapter
logger.info("Discord adapter registered")
except Exception as e:
logger.error(f"Discord adapter failed to initialize: {e}")
else:
logger.warning(
"Discord enabled but DISCORD_BOT_TOKEN not set. "
"Get a token from https://discord.com/developers/applications"
)
# WebChat adapter (enabled via config)
if cfg.webchat.enabled:
try:
from adapters.webchat_adapter import WebChatAdapter
webchat = WebChatAdapter(
host=cfg.webchat.host,
port=cfg.webchat.port,
)
webchat.on_message(handler)
_adapters["webchat"] = webchat
logger.info(f"WebChat adapter registered at http://{cfg.webchat.host}:{cfg.webchat.port}")
except Exception as e:
logger.error(f"WebChat adapter failed to initialize: {e}")
if not _adapters:
print("❌ No channel adapters initialized!")
print(" Configure adapters in ~/.aetheel/config.json")
print(" Set tokens in .env (SLACK_BOT_TOKEN, DISCORD_BOT_TOKEN, etc.)")
print(" Or enable webchat: {\"webchat\": {\"enabled\": true}}")
sys.exit(1)
# Start file watching for automatic memory re-indexing
if _memory:
_memory.start_watching()
# -------------------------------------------------------------------
# 7. Start Adapters
# -------------------------------------------------------------------
logger.info("=" * 60)
logger.info(" Aetheel Starting")
logger.info("=" * 60)
logger.info(f" Config: {CONFIG_PATH}")
logger.info(f" Runtime: {runtime_label}")
logger.info(f" Channels: {', '.join(_adapters.keys())}")
logger.info(f" Skills: {len(_skills.skills) if _skills else 0}")
logger.info(f" Scheduler: {'' if _scheduler else ''}")
logger.info(f" Heartbeat: {'' + str(heartbeat_count) + ' tasks' if _heartbeat else ''}")
logger.info(f" Subagents: {'' if _subagent_mgr else ''}")
logger.info(f" Hooks: {'' + str(hook_count) + ' hooks' if _hook_mgr else ''}")
logger.info(f" Webhooks: {'✅ port ' + str(cfg.webhooks.port) if _webhook_receiver else ''}")
logger.info("=" * 60)
try:
# Fire gateway:startup hook
if _hook_mgr:
_hook_mgr.trigger(HookEvent(type="gateway", action="startup"))
if len(_adapters) == 1:
# Single adapter — start it blocking
adapter = next(iter(_adapters.values()))
adapter.start()
else:
# Multiple adapters — start all but last async, last blocking
adapter_list = list(_adapters.values())
for adapter in adapter_list[:-1]:
adapter.start_async()
adapter_list[-1].start() # blocking
except KeyboardInterrupt:
logger.info("Shutting down...")
finally:
# Fire gateway:shutdown hook
if _hook_mgr:
_hook_mgr.trigger(HookEvent(type="gateway", action="shutdown"))
# Cleanup
if _webhook_receiver:
_webhook_receiver.stop()
for adapter in _adapters.values():
try:
adapter.stop()
except Exception:
pass
if _scheduler:
_scheduler.stop()
if _memory:
_memory.close()
logger.info("Aetheel stopped. Goodbye! 👋")
if __name__ == "__main__":
main()