""" Aetheel Base Adapter ==================== Abstract base class for all channel adapters (Slack, Telegram, Discord, etc.). Every adapter converts platform-specific events into a channel-agnostic IncomingMessage and routes responses back through send_message(). The AI handler only sees IncomingMessage — it never knows which platform the message came from. """ from __future__ import annotations import logging from abc import ABC, abstractmethod from dataclasses import dataclass, field from datetime import datetime, timezone from typing import Any, Callable logger = logging.getLogger("aetheel.adapters") # --------------------------------------------------------------------------- # Channel-Agnostic Message # --------------------------------------------------------------------------- @dataclass class IncomingMessage: """ Channel-agnostic incoming message. Every adapter converts its platform-specific event into this format before passing it to the message handler. This is the ONLY type the AI handler sees. """ text: str user_id: str user_name: str channel_id: str # platform-specific channel/chat ID channel_name: str conversation_id: str # unique ID for session isolation (thread, chat, etc.) source: str # "slack", "telegram", "discord", etc. is_dm: bool timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) raw_event: dict[str, Any] = field(default_factory=dict, repr=False) # Type alias for the message handler callback MessageHandler = Callable[[IncomingMessage], str | None] # --------------------------------------------------------------------------- # Abstract Base Adapter # --------------------------------------------------------------------------- class BaseAdapter(ABC): """ Abstract base class for channel adapters. Each adapter must: 1. Connect to the messaging platform 2. Convert incoming events into IncomingMessage objects 3. Call registered handlers with the IncomingMessage 4. Send responses back to the platform via send_message() """ def __init__(self) -> None: self._message_handlers: list[MessageHandler] = [] def on_message(self, handler: MessageHandler) -> MessageHandler: """ Register a message handler (can be used as a decorator). The handler receives an IncomingMessage and should return a response string or None. """ self._message_handlers.append(handler) return handler @abstractmethod def start(self) -> None: """Start the adapter (blocking).""" ... @abstractmethod def start_async(self) -> None: """Start the adapter in a background thread (non-blocking).""" ... @abstractmethod def stop(self) -> None: """Stop the adapter gracefully.""" ... @abstractmethod def send_message( self, channel_id: str, text: str, thread_id: str | None = None, ) -> None: """ Send a message to a channel/chat on this platform. Args: channel_id: Platform-specific channel/chat ID text: Message text thread_id: Optional thread/reply ID for threading """ ... @property @abstractmethod def source_name(self) -> str: """Short name for this adapter, e.g. 'slack', 'telegram'.""" ... def _dispatch(self, msg: IncomingMessage) -> None: """ Dispatch an IncomingMessage to all registered handlers. Called by subclasses after converting platform events. """ for handler in self._message_handlers: try: response = handler(msg) if response: self.send_message( channel_id=msg.channel_id, text=response, thread_id=msg.raw_event.get("thread_id"), ) except Exception as e: logger.error( f"[{self.source_name}] Handler error: {e}", exc_info=True ) try: self.send_message( channel_id=msg.channel_id, text="⚠️ Something went wrong processing your message.", thread_id=msg.raw_event.get("thread_id"), ) except Exception: pass