Initial commit: Discord-Claude Gateway with event-driven agent runtime
This commit is contained in:
235
src/agent-runtime.ts
Normal file
235
src/agent-runtime.ts
Normal file
@@ -0,0 +1,235 @@
|
||||
import { query } from "@anthropic-ai/claude-agent-sdk";
|
||||
import type { Event, MessagePayload, HeartbeatPayload, CronPayload, HookPayload } from "./event-queue.js";
|
||||
import type { MarkdownConfigLoader } from "./markdown-config-loader.js";
|
||||
import type { SystemPromptAssembler } from "./system-prompt-assembler.js";
|
||||
import type { SessionManager } from "./session-manager.js";
|
||||
import { formatErrorForUser } from "./error-formatter.js";
|
||||
import type { HookManager } from "./hook-manager.js";
|
||||
import type { GatewayConfig } from "./config.js";
|
||||
|
||||
export interface EventResult {
|
||||
responseText?: string;
|
||||
targetChannelId?: string;
|
||||
sessionId?: string;
|
||||
error?: string;
|
||||
}
|
||||
|
||||
export class AgentRuntime {
|
||||
private config: GatewayConfig;
|
||||
private sessionManager: SessionManager;
|
||||
private markdownConfigLoader: MarkdownConfigLoader;
|
||||
private systemPromptAssembler: SystemPromptAssembler;
|
||||
private hookManager: HookManager;
|
||||
|
||||
constructor(
|
||||
config: GatewayConfig,
|
||||
sessionManager: SessionManager,
|
||||
markdownConfigLoader: MarkdownConfigLoader,
|
||||
systemPromptAssembler: SystemPromptAssembler,
|
||||
hookManager: HookManager,
|
||||
) {
|
||||
this.config = config;
|
||||
this.sessionManager = sessionManager;
|
||||
this.markdownConfigLoader = markdownConfigLoader;
|
||||
this.systemPromptAssembler = systemPromptAssembler;
|
||||
this.hookManager = hookManager;
|
||||
}
|
||||
|
||||
async processEvent(event: Event): Promise<EventResult> {
|
||||
// Fire agent_begin inline hook
|
||||
await this.hookManager.fireInline("agent_begin", this);
|
||||
|
||||
try {
|
||||
const result = await this.processEventCore(event);
|
||||
|
||||
// Fire agent_stop inline hook
|
||||
await this.hookManager.fireInline("agent_stop", this);
|
||||
|
||||
return result;
|
||||
} catch (error) {
|
||||
// Fire agent_stop even on error
|
||||
await this.hookManager.fireInline("agent_stop", this);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
private async processEventCore(event: Event): Promise<EventResult> {
|
||||
// Read all markdown configs fresh
|
||||
const configs = await this.markdownConfigLoader.loadAll(this.config.configDir);
|
||||
const systemPrompt = this.systemPromptAssembler.assemble(configs);
|
||||
|
||||
switch (event.type) {
|
||||
case "message":
|
||||
return this.processMessage(event, systemPrompt);
|
||||
case "heartbeat":
|
||||
return this.processHeartbeat(event, systemPrompt);
|
||||
case "cron":
|
||||
return this.processCron(event, systemPrompt);
|
||||
case "hook":
|
||||
return this.processHook(event, systemPrompt);
|
||||
default:
|
||||
return {};
|
||||
}
|
||||
}
|
||||
|
||||
private async processMessage(event: Event, systemPrompt: string): Promise<EventResult> {
|
||||
const payload = event.payload as MessagePayload;
|
||||
const channelId = payload.prompt.channelId;
|
||||
const promptText = payload.prompt.text;
|
||||
const existingSessionId = this.sessionManager.getSessionId(channelId);
|
||||
|
||||
try {
|
||||
const responseText = await this.executeQuery(
|
||||
promptText,
|
||||
systemPrompt,
|
||||
channelId,
|
||||
existingSessionId,
|
||||
);
|
||||
|
||||
return {
|
||||
responseText,
|
||||
targetChannelId: channelId,
|
||||
sessionId: this.sessionManager.getSessionId(channelId),
|
||||
};
|
||||
} catch (error) {
|
||||
// If session is corrupted, remove the binding
|
||||
if (this.isSessionCorrupted(error)) {
|
||||
this.sessionManager.removeSession(channelId);
|
||||
}
|
||||
|
||||
const errorMessage = formatErrorForUser(error);
|
||||
return {
|
||||
error: errorMessage,
|
||||
targetChannelId: channelId,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
private async processHeartbeat(event: Event, systemPrompt: string): Promise<EventResult> {
|
||||
const payload = event.payload as HeartbeatPayload;
|
||||
const targetChannelId = this.config.outputChannelId;
|
||||
|
||||
try {
|
||||
const responseText = await this.executeQuery(
|
||||
payload.instruction,
|
||||
systemPrompt,
|
||||
);
|
||||
|
||||
return {
|
||||
responseText,
|
||||
targetChannelId,
|
||||
};
|
||||
} catch (error) {
|
||||
const errorMessage = formatErrorForUser(error);
|
||||
return { error: errorMessage, targetChannelId };
|
||||
}
|
||||
}
|
||||
|
||||
private async processCron(event: Event, systemPrompt: string): Promise<EventResult> {
|
||||
const payload = event.payload as CronPayload;
|
||||
const targetChannelId = this.config.outputChannelId;
|
||||
|
||||
try {
|
||||
const responseText = await this.executeQuery(
|
||||
payload.instruction,
|
||||
systemPrompt,
|
||||
);
|
||||
|
||||
return {
|
||||
responseText,
|
||||
targetChannelId,
|
||||
};
|
||||
} catch (error) {
|
||||
const errorMessage = formatErrorForUser(error);
|
||||
return { error: errorMessage, targetChannelId };
|
||||
}
|
||||
}
|
||||
|
||||
private async processHook(event: Event, systemPrompt: string): Promise<EventResult> {
|
||||
const payload = event.payload as HookPayload;
|
||||
|
||||
// If no instruction, return empty result (no query needed)
|
||||
if (!payload.instruction) {
|
||||
return {};
|
||||
}
|
||||
|
||||
const targetChannelId = this.config.outputChannelId;
|
||||
|
||||
try {
|
||||
const responseText = await this.executeQuery(
|
||||
payload.instruction,
|
||||
systemPrompt,
|
||||
);
|
||||
|
||||
return {
|
||||
responseText,
|
||||
targetChannelId,
|
||||
};
|
||||
} catch (error) {
|
||||
const errorMessage = formatErrorForUser(error);
|
||||
return { error: errorMessage, targetChannelId };
|
||||
}
|
||||
}
|
||||
|
||||
private async executeQuery(
|
||||
promptText: string,
|
||||
systemPrompt: string,
|
||||
channelId?: string,
|
||||
existingSessionId?: string,
|
||||
): Promise<string> {
|
||||
const options: Record<string, unknown> = {
|
||||
allowedTools: this.config.allowedTools,
|
||||
permissionMode: this.config.permissionMode,
|
||||
systemPrompt,
|
||||
};
|
||||
|
||||
if (existingSessionId) {
|
||||
options.resume = existingSessionId;
|
||||
}
|
||||
|
||||
const queryPromise = this.consumeStream(
|
||||
query({ prompt: promptText, options }),
|
||||
channelId,
|
||||
);
|
||||
|
||||
const timeoutPromise = new Promise<never>((_, reject) => {
|
||||
setTimeout(() => {
|
||||
reject(new Error("Query timed out"));
|
||||
}, this.config.queryTimeoutMs);
|
||||
});
|
||||
|
||||
return Promise.race([queryPromise, timeoutPromise]);
|
||||
}
|
||||
|
||||
private async consumeStream(
|
||||
stream: AsyncIterable<any>,
|
||||
channelId?: string,
|
||||
): Promise<string> {
|
||||
let resultText = "";
|
||||
|
||||
for await (const message of stream) {
|
||||
// Store session_id from init messages
|
||||
if (message.type === "system" && message.subtype === "init" && channelId) {
|
||||
this.sessionManager.setSessionId(channelId, message.session_id);
|
||||
}
|
||||
|
||||
// Collect result text
|
||||
if ("result" in message && typeof message.result === "string") {
|
||||
resultText += message.result;
|
||||
}
|
||||
}
|
||||
|
||||
return resultText;
|
||||
}
|
||||
|
||||
private isSessionCorrupted(error: unknown): boolean {
|
||||
if (error instanceof Error) {
|
||||
const msg = error.message.toLowerCase();
|
||||
return (
|
||||
msg.includes("session") &&
|
||||
(msg.includes("invalid") || msg.includes("corrupt") || msg.includes("not found") || msg.includes("expired"))
|
||||
);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user