import type { Event, MessagePayload, HeartbeatPayload, CronPayload, HookPayload } from "./event-queue.js"; import type { MarkdownConfigLoader } from "./markdown-config-loader.js"; import type { SystemPromptAssembler } from "./system-prompt-assembler.js"; import type { SessionManager } from "./session-manager.js"; import { formatErrorForUser } from "./error-formatter.js"; import type { HookManager } from "./hook-manager.js"; import type { GatewayConfig } from "./config.js"; import { loadSkills } from "./skills-loader.js"; import { logger } from "./logger.js"; import type { BackendAdapter, BackendEventResult } from "./backends/types.js"; export interface EventResult { responseText?: string; targetChannelId?: string; sessionId?: string; error?: string; } export type OnStreamResult = (text: string, channelId: string) => Promise; /** Maps a BackendEventResult to the gateway's EventResult, adding the target channel ID. */ export function mapBackendEventResult(backendResult: BackendEventResult, targetChannelId?: string): EventResult { if (backendResult.isError) { return { error: backendResult.responseText, targetChannelId }; } return { responseText: backendResult.responseText, targetChannelId, sessionId: backendResult.sessionId }; } function isTransientError(error: unknown): boolean { if (error instanceof Error) { const msg = error.message.toLowerCase(); if (msg.includes("session") && (msg.includes("invalid") || msg.includes("corrupt") || msg.includes("not found") || msg.includes("expired"))) { return false; } return msg.includes("timed out") || msg.includes("timeout") || msg.includes("exit") || msg.includes("spawn") || msg.includes("crash"); } return true; } export async function withRetry( fn: () => Promise, maxRetries: number, baseDelayMs: number, shouldRetry: (error: unknown) => boolean, ): Promise { let lastError: unknown; for (let attempt = 0; attempt <= maxRetries; attempt++) { try { return await fn(); } catch (error) { lastError = error; if (attempt >= maxRetries || !shouldRetry(error)) { throw error; } const delay = baseDelayMs * Math.pow(2, attempt); logger.info({ attempt: attempt + 1, maxRetries, delayMs: delay }, "Retrying after transient error"); await new Promise((resolve) => setTimeout(resolve, delay)); } } throw lastError; } export class AgentRuntime { private config: GatewayConfig; private backend: BackendAdapter; private sessionManager: SessionManager; private markdownConfigLoader: MarkdownConfigLoader; private systemPromptAssembler: SystemPromptAssembler; private hookManager: HookManager; constructor( config: GatewayConfig, backend: BackendAdapter, sessionManager: SessionManager, markdownConfigLoader: MarkdownConfigLoader, systemPromptAssembler: SystemPromptAssembler, hookManager: HookManager, ) { this.config = config; this.backend = backend; this.sessionManager = sessionManager; this.markdownConfigLoader = markdownConfigLoader; this.systemPromptAssembler = systemPromptAssembler; this.hookManager = hookManager; } async processEvent(event: Event, onStreamResult?: OnStreamResult): Promise { const isHookEvent = event.type === "hook"; if (!isHookEvent) { await this.hookManager.fireInline("agent_begin", this); } try { const result = await this.processEventCore(event, onStreamResult); if (!isHookEvent) { await this.hookManager.fireInline("agent_stop", this); } return result; } catch (error) { if (!isHookEvent) { await this.hookManager.fireInline("agent_stop", this); } throw error; } } private async processEventCore(event: Event, onStreamResult?: OnStreamResult): Promise { const configs = await this.markdownConfigLoader.loadAll(this.config.configDir); const skills = await loadSkills(this.config.configDir); const systemPrompt = this.systemPromptAssembler.assemble(configs, skills); switch (event.type) { case "message": return this.processMessage(event, systemPrompt, onStreamResult); case "heartbeat": return this.processHeartbeat(event, systemPrompt, onStreamResult); case "cron": return this.processCron(event, systemPrompt, onStreamResult); case "hook": return this.processHook(event, systemPrompt); default: return {}; } } private async processMessage(event: Event, systemPrompt: string, onStreamResult?: OnStreamResult): Promise { const payload = event.payload as MessagePayload; const channelId = payload.prompt.channelId; const promptText = payload.prompt.text; const existingSessionId = this.sessionManager.getSessionId(channelId); const streamCallback = onStreamResult ? async (text: string) => { await onStreamResult(text, channelId); } : undefined; try { const backendResult = await withRetry( () => this.backend.execute(promptText, systemPrompt, existingSessionId, streamCallback), 3, 5000, isTransientError, ); if (backendResult.sessionId && channelId) { this.sessionManager.setSessionId(channelId, backendResult.sessionId); } return mapBackendEventResult(backendResult, channelId); } catch (error) { if (this.isSessionCorrupted(error)) { this.sessionManager.removeSession(channelId); } return { error: formatErrorForUser(error), targetChannelId: channelId }; } } private async processHeartbeat(event: Event, systemPrompt: string, onStreamResult?: OnStreamResult): Promise { const payload = event.payload as HeartbeatPayload; const targetChannelId = this.config.outputChannelId; const streamCallback = onStreamResult && targetChannelId ? async (text: string) => { await onStreamResult(text, targetChannelId); } : undefined; try { const backendResult = await this.backend.execute(payload.instruction, systemPrompt, undefined, streamCallback); return mapBackendEventResult(backendResult, this.config.outputChannelId); } catch (error) { return { error: formatErrorForUser(error), targetChannelId: this.config.outputChannelId }; } } private async processCron(event: Event, systemPrompt: string, onStreamResult?: OnStreamResult): Promise { const payload = event.payload as CronPayload; const targetChannelId = this.config.outputChannelId; const streamCallback = onStreamResult && targetChannelId ? async (text: string) => { await onStreamResult(text, targetChannelId); } : undefined; try { const backendResult = await this.backend.execute(payload.instruction, systemPrompt, undefined, streamCallback); return mapBackendEventResult(backendResult, this.config.outputChannelId); } catch (error) { return { error: formatErrorForUser(error), targetChannelId: this.config.outputChannelId }; } } private async processHook(event: Event, systemPrompt: string): Promise { const payload = event.payload as HookPayload; if (!payload.instruction) return {}; try { const backendResult = await this.backend.execute(payload.instruction, systemPrompt); return mapBackendEventResult(backendResult, this.config.outputChannelId); } catch (error) { return { error: formatErrorForUser(error), targetChannelId: this.config.outputChannelId }; } } private isSessionCorrupted(error: unknown): boolean { if (error instanceof Error) { const msg = error.message.toLowerCase(); return ( msg.includes("session") && (msg.includes("invalid") || msg.includes("corrupt") || msg.includes("not found") || msg.includes("expired")) ); } return false; } }