""" Aetheel Discord Adapter ======================== Connects to Discord via the Bot Gateway using discord.py. Features: - DMs and @mentions in guild channels - Reply threading (off / first / all) - Channel history context injection (configurable per-channel) - Ack reactions while processing - Typing indicators while processing - Reaction handling (off / own / all) - Interactive components (buttons, selects, modals) - Native slash commands (/ask, /status, /help) - Exec approval buttons for dangerous AI tool use - Chunked replies for Discord's 2000-char limit Setup: 1. Create a bot at https://discord.com/developers/applications 2. Enable MESSAGE CONTENT intent in Bot settings 3. Set DISCORD_BOT_TOKEN in .env 4. Invite bot with: OAuth2 → URL Generator → bot + applications.commands scope 5. Start with: python main.py --discord """ from __future__ import annotations import asyncio import logging import os import threading import uuid from datetime import datetime, timezone from typing import Any, Callable import discord from discord import app_commands from adapters.base import BaseAdapter, IncomingMessage logger = logging.getLogger("aetheel.discord") def resolve_discord_token(explicit: str | None = None) -> str: """Resolve the Discord bot token.""" token = (explicit or os.environ.get("DISCORD_BOT_TOKEN", "")).strip() if not token: raise ValueError( "Discord bot token is required. " "Set DISCORD_BOT_TOKEN environment variable or pass it explicitly." ) return token # --------------------------------------------------------------------------- # Exec Approval State # --------------------------------------------------------------------------- class _ExecApprovalStore: """Thread-safe store for pending exec approval requests.""" def __init__(self) -> None: self._pending: dict[str, asyncio.Future[bool]] = {} self._lock = threading.Lock() def create(self, approval_id: str, loop: asyncio.AbstractEventLoop) -> asyncio.Future[bool]: future = loop.create_future() with self._lock: self._pending[approval_id] = future return future def resolve(self, approval_id: str, approved: bool) -> bool: with self._lock: future = self._pending.pop(approval_id, None) if future and not future.done(): future.get_loop().call_soon_threadsafe(future.set_result, approved) return True return False def cancel(self, approval_id: str) -> None: with self._lock: future = self._pending.pop(approval_id, None) if future and not future.done(): future.get_loop().call_soon_threadsafe(future.set_result, False) _approval_store = _ExecApprovalStore() # --------------------------------------------------------------------------- # Discord Adapter # --------------------------------------------------------------------------- class DiscordAdapter(BaseAdapter): """ Discord channel adapter using discord.py with interactive features. Supports: DMs, @mentions, reply threading, history context, ack reactions, typing indicators, reaction handling, slash commands, interactive components, and exec approval buttons. """ def __init__( self, bot_token: str | None = None, listen_channels: list[str] | None = None, reply_to_mode: str = "first", history_enabled: bool = True, history_limit: int = 20, channel_overrides: dict[str, Any] | None = None, ack_reaction: str = "👀", typing_indicator: bool = True, reaction_mode: str = "own", exec_approvals: bool = False, exec_approval_tools: list[str] | None = None, slash_commands: bool = True, components_enabled: bool = True, ): super().__init__() self._token = resolve_discord_token(bot_token) self._bot_user_id: int = 0 self._bot_user_name: str = "" self._running = False self._thread: threading.Thread | None = None self._loop: asyncio.AbstractEventLoop | None = None # Config self._reply_to_mode = reply_to_mode self._history_enabled = history_enabled self._history_limit = history_limit self._channel_overrides = channel_overrides or {} self._ack_reaction = ack_reaction self._typing_indicator = typing_indicator self._reaction_mode = reaction_mode self._exec_approvals = exec_approvals self._exec_approval_tools = exec_approval_tools or ["Bash", "Write", "Edit"] self._slash_commands = slash_commands self._components_enabled = components_enabled # Listen channels (no @mention needed) if listen_channels is not None: self._listen_channels: set[str] = set(listen_channels) else: raw = os.environ.get("DISCORD_LISTEN_CHANNELS", "").strip() self._listen_channels = { ch.strip() for ch in raw.split(",") if ch.strip() } # Set up intents intents = discord.Intents.default() intents.message_content = True intents.dm_messages = True intents.reactions = True intents.members = True self._client = discord.Client(intents=intents) self._tree = app_commands.CommandTree(self._client) self._register_handlers() # ------------------------------------------------------------------- # BaseAdapter implementation # ------------------------------------------------------------------- @property def source_name(self) -> str: return "discord" def start(self) -> None: logger.info("Starting Discord adapter...") self._running = True self._client.run(self._token, log_handler=None) def start_async(self) -> None: self._thread = threading.Thread( target=self._run_in_thread, daemon=True, name="discord-adapter" ) self._thread.start() logger.info("Discord adapter started in background thread") def stop(self) -> None: self._running = False if self._loop and not self._loop.is_closed(): asyncio.run_coroutine_threadsafe(self._client.close(), self._loop) logger.info("Discord adapter stopped.") def send_message( self, channel_id: str, text: str, thread_id: str | None = None, reply_to_message_id: int | None = None, components: list[discord.ui.View] | None = None, ) -> None: """Send a message with optional reply threading and components.""" if not text.strip(): return async def _send(): target = self._client.get_channel(int(channel_id)) if target is None: try: target = await self._client.fetch_channel(int(channel_id)) except discord.NotFound: logger.error(f"Channel {channel_id} not found") return # Build reference for reply threading reference = None if reply_to_message_id and self._reply_to_mode != "off": try: reference = discord.MessageReference( message_id=reply_to_message_id, channel_id=int(channel_id), ) except Exception: pass chunks = _chunk_text(text, 2000) view = components[0] if components and self._components_enabled else None for i, chunk in enumerate(chunks): # Only attach reference to first chunk, view to last chunk ref = reference if i == 0 else None v = view if i == len(chunks) - 1 else None try: await target.send(chunk, reference=ref, view=v) except discord.HTTPException as e: # If reply reference fails (deleted message), send without it if ref and e.code == 50035: await target.send(chunk, view=v) else: raise if self._loop and self._loop.is_running(): asyncio.run_coroutine_threadsafe(_send(), self._loop) else: asyncio.run(_send()) # ------------------------------------------------------------------- # History Context # ------------------------------------------------------------------- async def _fetch_history(self, channel: Any, channel_id: str) -> str: """Fetch recent message history for context injection.""" # Check per-channel override override = self._channel_overrides.get(channel_id, {}) if isinstance(override, dict): ch_enabled = override.get("history_enabled", self._history_enabled) ch_limit = override.get("history_limit", self._history_limit) else: ch_enabled = getattr(override, "history_enabled", None) if ch_enabled is None: ch_enabled = self._history_enabled ch_limit = getattr(override, "history_limit", None) if ch_limit is None: ch_limit = self._history_limit if not ch_enabled or ch_limit <= 0: return "" try: messages: list[str] = [] async for msg in channel.history(limit=ch_limit): if msg.author == self._client.user: role = "assistant" else: role = msg.author.display_name or msg.author.name content = msg.content or "" if content: messages.append(f"[{role}]: {content[:500]}") messages.reverse() if messages: return "\n".join(messages) except (discord.Forbidden, discord.HTTPException) as e: logger.debug(f"Could not fetch history for {channel_id}: {e}") return "" # ------------------------------------------------------------------- # Ack Reaction & Typing # ------------------------------------------------------------------- async def _add_ack_reaction(self, message: discord.Message) -> bool: """Add ack reaction to message. Returns True if added.""" if not self._ack_reaction: return False try: await message.add_reaction(self._ack_reaction) return True except (discord.Forbidden, discord.HTTPException) as e: logger.debug(f"Could not add ack reaction: {e}") return False async def _remove_ack_reaction(self, message: discord.Message) -> None: """Remove ack reaction from message.""" if not self._ack_reaction: return try: await message.remove_reaction(self._ack_reaction, self._client.user) except (discord.Forbidden, discord.HTTPException, discord.NotFound): pass # ------------------------------------------------------------------- # Exec Approvals # ------------------------------------------------------------------- async def _request_exec_approval( self, channel: Any, tool_name: str, description: str, user_id: str, ) -> bool: """Send an approval button prompt and wait for user response.""" if not self._exec_approvals or not self._loop: return True # Auto-approve if disabled if tool_name not in self._exec_approval_tools: return True # Not a gated tool approval_id = str(uuid.uuid4())[:8] view = _ExecApprovalView(approval_id, user_id) embed = discord.Embed( title="⚠️ Exec Approval Required", description=f"**Tool:** `{tool_name}`\n**Action:** {description[:500]}", color=discord.Color.orange(), ) embed.set_footer(text=f"Only <@{user_id}> can approve • ID: {approval_id}") try: await channel.send(embed=embed, view=view) except discord.HTTPException as e: logger.error(f"Failed to send approval prompt: {e}") return True # Fail open future = _approval_store.create(approval_id, self._loop) try: return await asyncio.wait_for(future, timeout=120.0) except asyncio.TimeoutError: _approval_store.cancel(approval_id) try: await channel.send(f"⏰ Approval `{approval_id}` timed out — action denied.") except discord.HTTPException: pass return False # ------------------------------------------------------------------- # Interactive Components: Send with buttons/selects # ------------------------------------------------------------------- async def send_components_message( self, channel_id: str, text: str, buttons: list[dict[str, Any]] | None = None, select_options: list[dict[str, str]] | None = None, callback: Callable[[discord.Interaction, str], Any] | None = None, ) -> None: """Send a message with interactive buttons or a select menu.""" if not self._components_enabled: # Fall back to plain text self.send_message(channel_id, text) return async def _send(): target = self._client.get_channel(int(channel_id)) if target is None: try: target = await self._client.fetch_channel(int(channel_id)) except discord.NotFound: return view = discord.ui.View(timeout=300) if buttons: for btn_cfg in buttons[:5]: # Discord max 5 buttons per row style_map = { "primary": discord.ButtonStyle.primary, "secondary": discord.ButtonStyle.secondary, "success": discord.ButtonStyle.success, "danger": discord.ButtonStyle.danger, } style = style_map.get(btn_cfg.get("style", "primary"), discord.ButtonStyle.primary) button = discord.ui.Button( label=btn_cfg.get("label", "Button"), style=style, custom_id=btn_cfg.get("custom_id", str(uuid.uuid4())[:8]), ) async def _btn_callback(interaction: discord.Interaction, cid=button.custom_id): await interaction.response.defer() if callback: await callback(interaction, cid) button.callback = _btn_callback view.add_item(button) if select_options: options = [ discord.SelectOption( label=opt.get("label", "Option"), value=opt.get("value", opt.get("label", "option")), description=opt.get("description"), ) for opt in select_options[:25] # Discord max 25 options ] select = discord.ui.Select( placeholder="Choose an option...", options=options, ) async def _select_callback(interaction: discord.Interaction): await interaction.response.defer() if callback: await callback(interaction, select.values[0] if select.values else "") select.callback = _select_callback view.add_item(select) await target.send(text, view=view) if self._loop and self._loop.is_running(): asyncio.run_coroutine_threadsafe(_send(), self._loop) # ------------------------------------------------------------------- # Event Handlers # ------------------------------------------------------------------- def _register_handlers(self) -> None: """Register Discord event handlers.""" @self._client.event async def on_ready(): if self._client.user: self._bot_user_id = self._client.user.id self._bot_user_name = self._client.user.name self._loop = asyncio.get_running_loop() self._running = True # Register slash commands if self._slash_commands: await self._register_slash_commands() logger.info("=" * 60) logger.info(" Aetheel Discord Adapter") logger.info("=" * 60) logger.info(f" Bot: @{self._bot_user_name} ({self._bot_user_id})") guilds = [g.name for g in self._client.guilds] logger.info(f" Guilds: {', '.join(guilds) or 'none'}") logger.info(f" Handlers: {len(self._message_handlers)} registered") logger.info(f" Reply: {self._reply_to_mode}") logger.info(f" History: {'on' if self._history_enabled else 'off'} (limit={self._history_limit})") logger.info(f" Ack: {self._ack_reaction or 'off'}") logger.info(f" Typing: {'on' if self._typing_indicator else 'off'}") logger.info(f" Reactions: {self._reaction_mode}") logger.info(f" Slash: {'on' if self._slash_commands else 'off'}") logger.info(f" Components:{'on' if self._components_enabled else 'off'}") logger.info(f" Approvals: {'on' if self._exec_approvals else 'off'}") if self._listen_channels: logger.info(f" Listen: {', '.join(self._listen_channels)}") logger.info("=" * 60) @self._client.event async def on_message(message: discord.Message): if message.author == self._client.user: return if message.author.bot: return is_dm = isinstance(message.channel, discord.DMChannel) text = message.content if not is_dm: channel_str = str(message.channel.id) is_listen_channel = channel_str in self._listen_channels if self._client.user and self._client.user.mentioned_in(message): text = text.replace(f"<@{self._bot_user_id}>", "").strip() text = text.replace(f"<@!{self._bot_user_id}>", "").strip() elif not is_listen_channel: return if not text.strip(): return user_name = message.author.display_name or message.author.name channel_name = ( f"DM with {user_name}" if is_dm else getattr(message.channel, "name", str(message.channel.id)) ) # Ack reaction ack_added = await self._add_ack_reaction(message) # Fetch history context history_context = "" if not is_dm: history_context = await self._fetch_history( message.channel, str(message.channel.id) ) msg = IncomingMessage( text=text, user_id=str(message.author.id), user_name=user_name, channel_id=str(message.channel.id), channel_name=channel_name, conversation_id=str(message.channel.id), source="discord", is_dm=is_dm, timestamp=( message.created_at.replace(tzinfo=timezone.utc) if message.created_at.tzinfo is None else message.created_at ), raw_event={ "thread_id": None, "message_id": message.id, "guild_id": message.guild.id if message.guild else None, "history_context": history_context, }, ) logger.info( f"📨 [Discord] {user_name} in {channel_name}: {text[:100]}" ) # Process with typing indicator async def _process(): try: if self._typing_indicator: async with message.channel.typing(): response = await asyncio.to_thread(self._dispatch_and_capture, msg) else: response = await asyncio.to_thread(self._dispatch_and_capture, msg) finally: if ack_added: await self._remove_ack_reaction(message) if response: # Determine reply reference based on mode reply_to_id = None if self._reply_to_mode == "first": reply_to_id = message.id elif self._reply_to_mode == "all": reply_to_id = message.id await self._send_async( str(message.channel.id), response, reply_to_id ) await _process() @self._client.event async def on_reaction_add(reaction: discord.Reaction, user: discord.User): """Handle reaction events based on reaction_mode.""" if self._reaction_mode == "off": return if user == self._client.user: return if user.bot: return # "own" = only reactions on bot's messages if self._reaction_mode == "own": if reaction.message.author != self._client.user: return emoji_str = str(reaction.emoji) user_name = getattr(user, "display_name", user.name) channel_id = str(reaction.message.channel.id) channel_name = getattr( reaction.message.channel, "name", channel_id ) msg = IncomingMessage( text=f"[Reaction: {emoji_str} on message: {reaction.message.content[:200] if reaction.message.content else '(no text)'}]", user_id=str(user.id), user_name=user_name, channel_id=channel_id, channel_name=channel_name, conversation_id=channel_id, source="discord", is_dm=isinstance(reaction.message.channel, discord.DMChannel), raw_event={ "type": "reaction", "emoji": emoji_str, "message_id": reaction.message.id, "original_text": reaction.message.content or "", }, ) logger.info( f"👍 [Discord] {user_name} reacted {emoji_str} in {channel_name}" ) await asyncio.to_thread(self._dispatch, msg) # ------------------------------------------------------------------- # Slash Commands # ------------------------------------------------------------------- async def _register_slash_commands(self) -> None: """Register Discord slash commands.""" @self._tree.command(name="ask", description="Ask Aetheel a question") async def slash_ask(interaction: discord.Interaction, message: str): await interaction.response.defer(thinking=True) msg = IncomingMessage( text=message, user_id=str(interaction.user.id), user_name=interaction.user.display_name or interaction.user.name, channel_id=str(interaction.channel_id), channel_name=getattr(interaction.channel, "name", str(interaction.channel_id)), conversation_id=str(interaction.channel_id), source="discord", is_dm=interaction.guild is None, raw_event={"type": "slash_command", "command": "ask"}, ) response = await asyncio.to_thread(self._dispatch_and_capture, msg) text = response or "No response." chunks = _chunk_text(text, 2000) await interaction.followup.send(chunks[0]) for chunk in chunks[1:]: await interaction.followup.send(chunk) @self._tree.command(name="status", description="Check Aetheel status") async def slash_status(interaction: discord.Interaction): msg = IncomingMessage( text="status", user_id=str(interaction.user.id), user_name=interaction.user.display_name or interaction.user.name, channel_id=str(interaction.channel_id), channel_name=getattr(interaction.channel, "name", str(interaction.channel_id)), conversation_id=str(interaction.channel_id), source="discord", is_dm=interaction.guild is None, raw_event={"type": "slash_command", "command": "status"}, ) response = await asyncio.to_thread(self._dispatch_and_capture, msg) await interaction.response.send_message( _chunk_text(response or "No status.", 2000)[0] ) @self._tree.command(name="help", description="Show Aetheel help") async def slash_help(interaction: discord.Interaction): msg = IncomingMessage( text="help", user_id=str(interaction.user.id), user_name=interaction.user.display_name or interaction.user.name, channel_id=str(interaction.channel_id), channel_name=getattr(interaction.channel, "name", str(interaction.channel_id)), conversation_id=str(interaction.channel_id), source="discord", is_dm=interaction.guild is None, raw_event={"type": "slash_command", "command": "help"}, ) response = await asyncio.to_thread(self._dispatch_and_capture, msg) await interaction.response.send_message( _chunk_text(response or "No help available.", 2000)[0] ) try: synced = await self._tree.sync() logger.info(f"Synced {len(synced)} slash command(s)") except Exception as e: logger.error(f"Failed to sync slash commands: {e}") # ------------------------------------------------------------------- # Internal helpers # ------------------------------------------------------------------- def _dispatch_and_capture(self, msg: IncomingMessage) -> str | None: """Dispatch to handlers and return the first response.""" for handler in self._message_handlers: try: response = handler(msg) if response: return response except Exception as e: logger.error(f"[discord] Handler error: {e}", exc_info=True) return "⚠️ Something went wrong processing your message." return None async def _send_async( self, channel_id: str, text: str, reply_to_message_id: int | None = None, ) -> None: """Send a message asynchronously (called from within the event loop).""" target = self._client.get_channel(int(channel_id)) if target is None: try: target = await self._client.fetch_channel(int(channel_id)) except discord.NotFound: logger.error(f"Channel {channel_id} not found") return reference = None if reply_to_message_id and self._reply_to_mode != "off": reference = discord.MessageReference( message_id=reply_to_message_id, channel_id=int(channel_id), ) chunks = _chunk_text(text, 2000) for i, chunk in enumerate(chunks): ref = reference if i == 0 else None try: await target.send(chunk, reference=ref) except discord.HTTPException as e: if ref and e.code == 50035: await target.send(chunk) else: raise def _run_in_thread(self) -> None: self._running = True self._client.run(self._token, log_handler=None) # --------------------------------------------------------------------------- # Exec Approval View # --------------------------------------------------------------------------- class _ExecApprovalView(discord.ui.View): """Button view for exec approval prompts.""" def __init__(self, approval_id: str, allowed_user_id: str): super().__init__(timeout=120) self._approval_id = approval_id self._allowed_user_id = allowed_user_id @discord.ui.button(label="Approve", style=discord.ButtonStyle.success, emoji="✅") async def approve(self, interaction: discord.Interaction, button: discord.ui.Button): if str(interaction.user.id) != self._allowed_user_id: await interaction.response.send_message( "Only the original requester can approve.", ephemeral=True ) return _approval_store.resolve(self._approval_id, True) await interaction.response.edit_message( content="✅ Approved", view=None ) self.stop() @discord.ui.button(label="Deny", style=discord.ButtonStyle.danger, emoji="❌") async def deny(self, interaction: discord.Interaction, button: discord.ui.Button): if str(interaction.user.id) != self._allowed_user_id: await interaction.response.send_message( "Only the original requester can deny.", ephemeral=True ) return _approval_store.resolve(self._approval_id, False) await interaction.response.edit_message( content="❌ Denied", view=None ) self.stop() # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _chunk_text(text: str, limit: int = 2000) -> list[str]: """Split text into chunks respecting Discord's character limit.""" if len(text) <= limit: return [text] chunks = [] remaining = text while remaining: if len(remaining) <= limit: chunks.append(remaining) break cut = limit newline_pos = remaining.rfind("\n", 0, limit) if newline_pos > limit // 2: cut = newline_pos + 1 else: space_pos = remaining.rfind(" ", 0, limit) if space_pos > limit // 2: cut = space_pos + 1 chunks.append(remaining[:cut]) remaining = remaining[cut:] return chunks