Initial commit: Discord-Claude Gateway with event-driven agent runtime
This commit is contained in:
BIN
references/MyOwnOpenClaw.png
Normal file
BIN
references/MyOwnOpenClaw.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 472 KiB |
@@ -1,6 +1,7 @@
|
||||
import { spawn } from "node:child_process";
|
||||
import { writeFile, unlink } from "node:fs/promises";
|
||||
import { join } from "node:path";
|
||||
import path from "node:path";
|
||||
import { tmpdir } from "node:os";
|
||||
import { randomUUID } from "node:crypto";
|
||||
import type { Event, MessagePayload, HeartbeatPayload, CronPayload, HookPayload } from "./event-queue.js";
|
||||
@@ -18,6 +19,8 @@ export interface EventResult {
|
||||
error?: string;
|
||||
}
|
||||
|
||||
export type OnStreamResult = (text: string, channelId: string) => Promise<void>;
|
||||
|
||||
interface ClaudeJsonResponse {
|
||||
type: string;
|
||||
subtype?: string;
|
||||
@@ -51,9 +54,7 @@ export class AgentRuntime {
|
||||
this.hookManager = hookManager;
|
||||
}
|
||||
|
||||
async processEvent(event: Event): Promise<EventResult> {
|
||||
// Skip inline hooks for hook events to prevent infinite recursion
|
||||
// (fireInline calls processEvent which would call fireInline again)
|
||||
async processEvent(event: Event, onStreamResult?: OnStreamResult): Promise<EventResult> {
|
||||
const isHookEvent = event.type === "hook";
|
||||
|
||||
if (!isHookEvent) {
|
||||
@@ -61,7 +62,7 @@ export class AgentRuntime {
|
||||
}
|
||||
|
||||
try {
|
||||
const result = await this.processEventCore(event);
|
||||
const result = await this.processEventCore(event, onStreamResult);
|
||||
if (!isHookEvent) {
|
||||
await this.hookManager.fireInline("agent_stop", this);
|
||||
}
|
||||
@@ -74,17 +75,17 @@ export class AgentRuntime {
|
||||
}
|
||||
}
|
||||
|
||||
private async processEventCore(event: Event): Promise<EventResult> {
|
||||
private async processEventCore(event: Event, onStreamResult?: OnStreamResult): Promise<EventResult> {
|
||||
const configs = await this.markdownConfigLoader.loadAll(this.config.configDir);
|
||||
const systemPrompt = this.systemPromptAssembler.assemble(configs);
|
||||
|
||||
switch (event.type) {
|
||||
case "message":
|
||||
return this.processMessage(event, systemPrompt);
|
||||
return this.processMessage(event, systemPrompt, onStreamResult);
|
||||
case "heartbeat":
|
||||
return this.processHeartbeat(event, systemPrompt);
|
||||
return this.processHeartbeat(event, systemPrompt, onStreamResult);
|
||||
case "cron":
|
||||
return this.processCron(event, systemPrompt);
|
||||
return this.processCron(event, systemPrompt, onStreamResult);
|
||||
case "hook":
|
||||
return this.processHook(event, systemPrompt);
|
||||
default:
|
||||
@@ -92,14 +93,18 @@ export class AgentRuntime {
|
||||
}
|
||||
}
|
||||
|
||||
private async processMessage(event: Event, systemPrompt: string): Promise<EventResult> {
|
||||
private async processMessage(event: Event, systemPrompt: string, onStreamResult?: OnStreamResult): Promise<EventResult> {
|
||||
const payload = event.payload as MessagePayload;
|
||||
const channelId = payload.prompt.channelId;
|
||||
const promptText = payload.prompt.text;
|
||||
const existingSessionId = this.sessionManager.getSessionId(channelId);
|
||||
|
||||
const streamCallback = onStreamResult
|
||||
? (text: string) => onStreamResult(text, channelId)
|
||||
: undefined;
|
||||
|
||||
try {
|
||||
const response = await this.executeClaude(promptText, systemPrompt, existingSessionId);
|
||||
const response = await this.executeClaude(promptText, systemPrompt, existingSessionId, streamCallback);
|
||||
|
||||
if (response.session_id && channelId) {
|
||||
this.sessionManager.setSessionId(channelId, response.session_id);
|
||||
@@ -118,20 +123,28 @@ export class AgentRuntime {
|
||||
}
|
||||
}
|
||||
|
||||
private async processHeartbeat(event: Event, systemPrompt: string): Promise<EventResult> {
|
||||
private async processHeartbeat(event: Event, systemPrompt: string, onStreamResult?: OnStreamResult): Promise<EventResult> {
|
||||
const payload = event.payload as HeartbeatPayload;
|
||||
const targetChannelId = this.config.outputChannelId;
|
||||
const streamCallback = onStreamResult && targetChannelId
|
||||
? (text: string) => onStreamResult(text, targetChannelId)
|
||||
: undefined;
|
||||
try {
|
||||
const response = await this.executeClaude(payload.instruction, systemPrompt);
|
||||
const response = await this.executeClaude(payload.instruction, systemPrompt, undefined, streamCallback);
|
||||
return { responseText: response.result, targetChannelId: this.config.outputChannelId };
|
||||
} catch (error) {
|
||||
return { error: formatErrorForUser(error), targetChannelId: this.config.outputChannelId };
|
||||
}
|
||||
}
|
||||
|
||||
private async processCron(event: Event, systemPrompt: string): Promise<EventResult> {
|
||||
private async processCron(event: Event, systemPrompt: string, onStreamResult?: OnStreamResult): Promise<EventResult> {
|
||||
const payload = event.payload as CronPayload;
|
||||
const targetChannelId = this.config.outputChannelId;
|
||||
const streamCallback = onStreamResult && targetChannelId
|
||||
? (text: string) => onStreamResult(text, targetChannelId)
|
||||
: undefined;
|
||||
try {
|
||||
const response = await this.executeClaude(payload.instruction, systemPrompt);
|
||||
const response = await this.executeClaude(payload.instruction, systemPrompt, undefined, streamCallback);
|
||||
return { responseText: response.result, targetChannelId: this.config.outputChannelId };
|
||||
} catch (error) {
|
||||
return { error: formatErrorForUser(error), targetChannelId: this.config.outputChannelId };
|
||||
@@ -153,13 +166,13 @@ export class AgentRuntime {
|
||||
promptText: string,
|
||||
systemPrompt: string,
|
||||
sessionId?: string,
|
||||
onResult?: (text: string) => Promise<void>,
|
||||
): Promise<ClaudeJsonResponse> {
|
||||
// Write system prompt to a temp file to avoid CLI arg length limits
|
||||
const tmpFile = join(tmpdir(), `aetheel-prompt-${randomUUID()}.txt`);
|
||||
await writeFile(tmpFile, systemPrompt, "utf-8");
|
||||
|
||||
try {
|
||||
return await this.runClaude(promptText, tmpFile, sessionId);
|
||||
return await this.runClaude(promptText, tmpFile, sessionId, onResult);
|
||||
} finally {
|
||||
unlink(tmpFile).catch(() => {});
|
||||
}
|
||||
@@ -169,48 +182,78 @@ export class AgentRuntime {
|
||||
promptText: string,
|
||||
systemPromptFile: string,
|
||||
sessionId?: string,
|
||||
onResult?: (text: string) => Promise<void>,
|
||||
): Promise<ClaudeJsonResponse> {
|
||||
return new Promise((resolve, reject) => {
|
||||
// Build args — keep it minimal and match what works on the CLI directly
|
||||
const args: string[] = [
|
||||
"-p", promptText,
|
||||
"--output-format", "json",
|
||||
"--dangerously-skip-permissions",
|
||||
"--append-system-prompt-file", systemPromptFile,
|
||||
"--verbose",
|
||||
];
|
||||
|
||||
if (sessionId) {
|
||||
args.push("--resume", sessionId);
|
||||
}
|
||||
|
||||
// --allowedTools expects each tool as a separate quoted arg
|
||||
for (const tool of this.config.allowedTools) {
|
||||
args.push("--allowedTools", tool);
|
||||
}
|
||||
|
||||
args.push("--max-turns", "25");
|
||||
|
||||
console.log(`[DEBUG] Spawning: ${this.config.claudeCliPath} args=${JSON.stringify(args.slice(0, 8))}... (${args.length} total)`);
|
||||
const configDir = path.resolve(this.config.configDir);
|
||||
console.log(`[DEBUG] Spawning: ${this.config.claudeCliPath} cwd=${configDir} args=${JSON.stringify(args.slice(0, 8))}... (${args.length} total)`);
|
||||
|
||||
const child = spawn(this.config.claudeCliPath, args, {
|
||||
stdio: ["ignore", "pipe", "pipe"],
|
||||
cwd: configDir,
|
||||
});
|
||||
|
||||
let stdout = "";
|
||||
let stderr = "";
|
||||
let parsedSessionId: string | undefined;
|
||||
let lastResultText = "";
|
||||
let streamedResults = false;
|
||||
|
||||
// Parse JSON objects from stdout as they arrive for streaming
|
||||
let parseBuffer = "";
|
||||
|
||||
child.stdout.on("data", (data: Buffer) => {
|
||||
stdout += data.toString();
|
||||
const chunk = data.toString();
|
||||
stdout += chunk;
|
||||
parseBuffer += chunk;
|
||||
|
||||
// Try to parse complete JSON objects from the buffer
|
||||
// The output is a JSON array like [{...},{...},...] or newline-delimited
|
||||
const lines = parseBuffer.split("\n");
|
||||
parseBuffer = lines.pop() || ""; // Keep incomplete last line in buffer
|
||||
|
||||
for (const line of lines) {
|
||||
const cleaned = line.replace(/^\[/, "").replace(/,?\]$/, "").replace(/^,/, "").trim();
|
||||
if (!cleaned) continue;
|
||||
try {
|
||||
const obj = JSON.parse(cleaned);
|
||||
if (obj.type === "system" && obj.subtype === "init" && obj.session_id) {
|
||||
parsedSessionId = obj.session_id;
|
||||
}
|
||||
if (obj.type === "result" && obj.result) {
|
||||
lastResultText = obj.result;
|
||||
if (onResult) {
|
||||
streamedResults = true;
|
||||
onResult(obj.result).catch((err) =>
|
||||
console.error("[DEBUG] Stream callback error:", err)
|
||||
);
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
// Not valid JSON yet
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
child.stderr.on("data", (data: Buffer) => {
|
||||
const chunk = data.toString();
|
||||
stderr += chunk;
|
||||
// Log stderr in real-time so we can see what's happening
|
||||
if (chunk.trim()) {
|
||||
console.log(`[DEBUG] Claude stderr: ${chunk.trim().slice(0, 200)}`);
|
||||
}
|
||||
stderr += data.toString();
|
||||
});
|
||||
|
||||
const timer = setTimeout(() => {
|
||||
@@ -221,76 +264,52 @@ export class AgentRuntime {
|
||||
|
||||
child.on("close", (code) => {
|
||||
clearTimeout(timer);
|
||||
|
||||
console.log(`[DEBUG] Claude CLI exited: code=${code}, stdout=${stdout.length} chars`);
|
||||
if (stdout.length > 0) {
|
||||
console.log(`[DEBUG] Claude stdout preview: ${stdout.slice(0, 300)}`);
|
||||
}
|
||||
console.log(`[DEBUG] Claude CLI exited: code=${code}, stdout=${stdout.length} chars, streamed=${streamedResults}`);
|
||||
|
||||
if (code !== 0 && code !== null) {
|
||||
reject(new Error(`Claude CLI error (exit ${code}): ${stderr.slice(0, 500) || "unknown error"}`));
|
||||
return;
|
||||
}
|
||||
|
||||
// Final parse of any remaining buffer
|
||||
if (parseBuffer.trim()) {
|
||||
try {
|
||||
// The CLI with --output-format json returns newline-delimited JSON objects.
|
||||
// We need to find the "result" type message which contains the actual response.
|
||||
const lines = stdout.trim().split("\n");
|
||||
let resultText = "";
|
||||
let sessionId: string | undefined;
|
||||
|
||||
for (const line of lines) {
|
||||
try {
|
||||
// Each line might be a JSON object or part of a JSON array
|
||||
const cleaned = line.replace(/^\[/, "").replace(/,?\]$/, "").replace(/^,/, "").trim();
|
||||
if (!cleaned) continue;
|
||||
const cleaned = parseBuffer.replace(/^\[/, "").replace(/,?\]$/, "").replace(/^,/, "").trim();
|
||||
const obj = JSON.parse(cleaned);
|
||||
|
||||
if (obj.type === "system" && obj.subtype === "init" && obj.session_id) {
|
||||
sessionId = obj.session_id;
|
||||
parsedSessionId = obj.session_id;
|
||||
}
|
||||
|
||||
if (obj.type === "result" && obj.result) {
|
||||
resultText = obj.result;
|
||||
}
|
||||
} catch {
|
||||
// Skip unparseable lines
|
||||
lastResultText = obj.result;
|
||||
}
|
||||
} catch { /* ignore */ }
|
||||
}
|
||||
|
||||
// If we couldn't parse individual lines, try parsing the whole thing as a JSON array
|
||||
if (!resultText) {
|
||||
// If we didn't get results from line-by-line parsing, try the full output
|
||||
if (!lastResultText) {
|
||||
try {
|
||||
const arr = JSON.parse(stdout.trim());
|
||||
if (Array.isArray(arr)) {
|
||||
for (const obj of arr) {
|
||||
if (obj.type === "system" && obj.subtype === "init" && obj.session_id) {
|
||||
sessionId = obj.session_id;
|
||||
parsedSessionId = obj.session_id;
|
||||
}
|
||||
if (obj.type === "result" && obj.result) {
|
||||
resultText = obj.result;
|
||||
lastResultText = obj.result;
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
// Not a JSON array either
|
||||
}
|
||||
} catch { /* ignore */ }
|
||||
}
|
||||
|
||||
console.log(`[DEBUG] Parsed result: ${resultText.length} chars, session=${sessionId ?? "none"}`);
|
||||
console.log(`[DEBUG] Parsed: result=${lastResultText.length} chars, session=${parsedSessionId ?? "none"}`);
|
||||
|
||||
resolve({
|
||||
type: "result",
|
||||
result: resultText || undefined,
|
||||
session_id: sessionId,
|
||||
result: streamedResults ? undefined : lastResultText || undefined,
|
||||
session_id: parsedSessionId,
|
||||
is_error: false,
|
||||
});
|
||||
} catch {
|
||||
resolve({
|
||||
type: "result",
|
||||
result: stdout.trim(),
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
child.on("error", (err) => {
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import { join } from "node:path";
|
||||
import { loadConfig, type GatewayConfig } from "./config.js";
|
||||
import { DiscordBot, type Prompt } from "./discord-bot.js";
|
||||
import { EventQueue, type Event, type MessagePayload } from "./event-queue.js";
|
||||
@@ -44,7 +45,7 @@ export class GatewayCore {
|
||||
this.eventQueue = new EventQueue(this.config.maxQueueDepth);
|
||||
|
||||
// 5. Initialize AgentRuntime with all dependencies
|
||||
this.sessionManager = new SessionManager();
|
||||
this.sessionManager = new SessionManager(join(this.config.configDir, "sessions.json"));
|
||||
this.markdownConfigLoader = new MarkdownConfigLoader();
|
||||
const systemPromptAssembler = new SystemPromptAssembler();
|
||||
this.hookManager = new HookManager();
|
||||
@@ -94,9 +95,18 @@ export class GatewayCore {
|
||||
this.eventQueue.onEvent(async (event: Event) => {
|
||||
console.log(`[DEBUG] Processing event: type=${event.type}, id=${event.id}`);
|
||||
try {
|
||||
const result = await this.agentRuntime.processEvent(event);
|
||||
// Streaming callback — sends results to Discord as they arrive
|
||||
const onStreamResult = async (text: string, channelId: string) => {
|
||||
const chunks = splitMessage(text);
|
||||
for (const chunk of chunks) {
|
||||
await this.discordBot.sendMessage(channelId, chunk);
|
||||
}
|
||||
};
|
||||
|
||||
const result = await this.agentRuntime.processEvent(event, onStreamResult);
|
||||
console.log(`[DEBUG] Event result: responseText=${result.responseText?.length ?? 0} chars, error=${result.error ?? "none"}`);
|
||||
|
||||
// Only send if not already streamed
|
||||
if (result.responseText && result.targetChannelId) {
|
||||
const chunks = splitMessage(result.responseText);
|
||||
for (const chunk of chunks) {
|
||||
|
||||
@@ -1,5 +1,16 @@
|
||||
import { readFileSync, writeFileSync, mkdirSync } from "node:fs";
|
||||
import { join, dirname } from "node:path";
|
||||
|
||||
export class SessionManager {
|
||||
private bindings = new Map<string, string>();
|
||||
private persistPath: string | null = null;
|
||||
|
||||
constructor(persistPath?: string) {
|
||||
if (persistPath) {
|
||||
this.persistPath = persistPath;
|
||||
this.loadFromDisk();
|
||||
}
|
||||
}
|
||||
|
||||
getSessionId(channelId: string): string | undefined {
|
||||
return this.bindings.get(channelId);
|
||||
@@ -7,13 +18,44 @@ export class SessionManager {
|
||||
|
||||
setSessionId(channelId: string, sessionId: string): void {
|
||||
this.bindings.set(channelId, sessionId);
|
||||
this.saveToDisk();
|
||||
}
|
||||
|
||||
removeSession(channelId: string): void {
|
||||
this.bindings.delete(channelId);
|
||||
this.saveToDisk();
|
||||
}
|
||||
|
||||
clear(): void {
|
||||
this.bindings.clear();
|
||||
this.saveToDisk();
|
||||
}
|
||||
|
||||
private loadFromDisk(): void {
|
||||
if (!this.persistPath) return;
|
||||
try {
|
||||
const data = readFileSync(this.persistPath, "utf-8");
|
||||
const parsed = JSON.parse(data) as Record<string, string>;
|
||||
for (const [k, v] of Object.entries(parsed)) {
|
||||
this.bindings.set(k, v);
|
||||
}
|
||||
console.log(`Sessions loaded: ${this.bindings.size} channel(s)`);
|
||||
} catch {
|
||||
// File doesn't exist yet — that's fine
|
||||
}
|
||||
}
|
||||
|
||||
private saveToDisk(): void {
|
||||
if (!this.persistPath) return;
|
||||
try {
|
||||
mkdirSync(dirname(this.persistPath), { recursive: true });
|
||||
const obj: Record<string, string> = {};
|
||||
for (const [k, v] of this.bindings) {
|
||||
obj[k] = v;
|
||||
}
|
||||
writeFileSync(this.persistPath, JSON.stringify(obj, null, 2), "utf-8");
|
||||
} catch (err) {
|
||||
console.error("Failed to persist sessions:", err);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,5 +3,6 @@ import { defineConfig } from "vitest/config";
|
||||
export default defineConfig({
|
||||
test: {
|
||||
globals: true,
|
||||
exclude: ["**/node_modules/**", "**/references/**", "**/dist/**"],
|
||||
},
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user