""" Aetheel Hook System =================== Event-driven lifecycle hooks inspired by OpenClaw's internal hook system. Hooks fire on lifecycle events (gateway startup, session new/reset, agent bootstrap) and let you run custom Python code at those moments. Hooks are discovered from HOOK.md files in the workspace and managed directories. Supported events: - gateway:startup — Gateway process starts (after adapters connect) - gateway:shutdown — Gateway process is shutting down - command:new — User starts a fresh session (/new) - command:reload — User reloads config (/reload) - agent:bootstrap — Before workspace files are injected into context - agent:response — After the agent produces a response Hook structure: ~/.aetheel/workspace/hooks//HOOK.md (workspace hooks) ~/.aetheel/hooks//HOOK.md (managed hooks) HOOK.md format: --- name: my-hook description: What this hook does events: [gateway:startup, command:new] enabled: true --- # Documentation goes here... Handler: hooks//handler.py with a `handle(event)` function. """ import importlib.util import logging import os import re import threading from dataclasses import dataclass, field from datetime import datetime, timezone from typing import Any, Callable logger = logging.getLogger("aetheel.hooks") # --------------------------------------------------------------------------- # Types # --------------------------------------------------------------------------- @dataclass class HookEvent: """An event passed to hook handlers.""" type: str # e.g. "gateway", "command", "agent" action: str # e.g. "startup", "new", "bootstrap" session_key: str = "" context: dict[str, Any] = field(default_factory=dict) timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) messages: list[str] = field(default_factory=list) @property def event_key(self) -> str: return f"{self.type}:{self.action}" # Callback type: receives a HookEvent HookHandler = Callable[[HookEvent], None] @dataclass class HookEntry: """A discovered hook with metadata.""" name: str description: str events: list[str] # e.g. ["gateway:startup", "command:new"] enabled: bool = True source: str = "workspace" # "workspace", "managed", or "programmatic" base_dir: str = "" handler_path: str = "" _handler_fn: HookHandler | None = field(default=None, repr=False) # --------------------------------------------------------------------------- # Hook Manager # --------------------------------------------------------------------------- class HookManager: """ Discovers, loads, and triggers lifecycle hooks. Hooks are discovered from: 1. Workspace hooks: /hooks//HOOK.md 2. Managed hooks: ~/.aetheel/hooks//HOOK.md 3. Programmatic: registered via register() When an event fires, all hooks listening for that event are called in discovery order. Errors in one hook don't prevent others from running. """ def __init__(self, workspace_dir: str | None = None): self._hooks: list[HookEntry] = [] self._programmatic: dict[str, list[HookHandler]] = {} self._lock = threading.Lock() self._workspace_dir = workspace_dir def discover(self) -> list[HookEntry]: """Discover hooks from workspace and managed directories.""" self._hooks = [] dirs_to_scan: list[tuple[str, str]] = [] # Workspace hooks if self._workspace_dir: ws_hooks = os.path.join(self._workspace_dir, "hooks") if os.path.isdir(ws_hooks): dirs_to_scan.append((ws_hooks, "workspace")) # Managed hooks (~/.aetheel/hooks/) managed = os.path.expanduser("~/.aetheel/hooks") if os.path.isdir(managed): dirs_to_scan.append((managed, "managed")) for hooks_dir, source in dirs_to_scan: for entry_name in sorted(os.listdir(hooks_dir)): hook_dir = os.path.join(hooks_dir, entry_name) if not os.path.isdir(hook_dir): continue hook_md = os.path.join(hook_dir, "HOOK.md") if not os.path.isfile(hook_md): continue try: hook = self._parse_hook(hook_md, source) if hook and hook.enabled: self._hooks.append(hook) logger.info( f"Hook discovered: {hook.name} " f"(events={hook.events}, source={source})" ) except Exception as e: logger.warning(f"Failed to load hook from {hook_md}: {e}") logger.info(f"Hooks discovered: {len(self._hooks)} hook(s)") return list(self._hooks) def register(self, event_key: str, handler: HookHandler) -> None: """Register a programmatic hook handler for an event key.""" with self._lock: self._programmatic.setdefault(event_key, []).append(handler) def unregister(self, event_key: str, handler: HookHandler) -> None: """Unregister a programmatic hook handler.""" with self._lock: handlers = self._programmatic.get(event_key, []) if handler in handlers: handlers.remove(handler) def trigger(self, event: HookEvent) -> list[str]: """ Trigger all hooks listening for this event. Returns any messages hooks pushed to event.messages. """ event_key = event.event_key # File-based hooks for hook in self._hooks: if event_key in hook.events or event.type in hook.events: self._invoke_hook(hook, event) # Programmatic hooks with self._lock: type_handlers = list(self._programmatic.get(event.type, [])) specific_handlers = list(self._programmatic.get(event_key, [])) for handler in type_handlers + specific_handlers: try: handler(event) except Exception as e: logger.error(f"Programmatic hook error [{event_key}]: {e}") return list(event.messages) def list_hooks(self) -> list[HookEntry]: """List all discovered hooks.""" return list(self._hooks) # ------------------------------------------------------------------- # Internal # ------------------------------------------------------------------- def _invoke_hook(self, hook: HookEntry, event: HookEvent) -> None: """Invoke a file-based hook's handler.""" if hook._handler_fn is None: hook._handler_fn = self._load_handler(hook) if hook._handler_fn is None: return try: hook._handler_fn(event) except Exception as e: logger.error( f"Hook error [{hook.name}] on {event.event_key}: {e}", exc_info=True, ) def _load_handler(self, hook: HookEntry) -> HookHandler | None: """Load a hook's handler.py module and return its handle() function.""" handler_path = hook.handler_path if not handler_path or not os.path.isfile(handler_path): logger.debug(f"No handler.py for hook {hook.name}") return None try: spec = importlib.util.spec_from_file_location( f"hook_{hook.name}", handler_path ) if spec is None or spec.loader is None: return None module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) handler_fn = getattr(module, "handle", None) if callable(handler_fn): return handler_fn logger.warning(f"Hook {hook.name}: handler.py has no handle() function") return None except Exception as e: logger.error(f"Failed to load handler for hook {hook.name}: {e}") return None def _parse_hook(self, hook_md_path: str, source: str) -> HookEntry | None: """Parse a HOOK.md file into a HookEntry.""" with open(hook_md_path, "r", encoding="utf-8") as f: content = f.read() frontmatter, _ = self._split_frontmatter(content) if not frontmatter: return None name = self._extract_field(frontmatter, "name") if not name: name = os.path.basename(os.path.dirname(hook_md_path)) description = self._extract_field(frontmatter, "description") or "" events_raw = self._extract_field(frontmatter, "events") or "" events = self._parse_list(events_raw) enabled_raw = self._extract_field(frontmatter, "enabled") enabled = enabled_raw.lower() != "false" if enabled_raw else True base_dir = os.path.dirname(hook_md_path) handler_path = os.path.join(base_dir, "handler.py") return HookEntry( name=name, description=description, events=events, enabled=enabled, source=source, base_dir=base_dir, handler_path=handler_path, ) @staticmethod def _split_frontmatter(content: str) -> tuple[str, str]: match = re.match(r"^---\s*\n(.*?)\n---\s*\n(.*)", content, re.DOTALL) if match: return match.group(1), match.group(2) return "", content @staticmethod def _extract_field(frontmatter: str, field_name: str) -> str | None: pattern = rf"^{re.escape(field_name)}\s*:\s*(.+)$" match = re.search(pattern, frontmatter, re.MULTILINE) if match: value = match.group(1).strip().strip("'\"") return value return None @staticmethod def _parse_list(raw: str) -> list[str]: if not raw: return [] raw = raw.strip() if raw.startswith("[") and raw.endswith("]"): raw = raw[1:-1] return [item.strip().strip("'\"") for item in raw.split(",") if item.strip()]