diff --git a/references/MyOwnOpenClaw.png b/references/MyOwnOpenClaw.png new file mode 100644 index 0000000..cbe4d5a Binary files /dev/null and b/references/MyOwnOpenClaw.png differ diff --git a/src/agent-runtime.ts b/src/agent-runtime.ts index 46bdb8a..0c35241 100644 --- a/src/agent-runtime.ts +++ b/src/agent-runtime.ts @@ -1,6 +1,7 @@ import { spawn } from "node:child_process"; import { writeFile, unlink } from "node:fs/promises"; import { join } from "node:path"; +import path from "node:path"; import { tmpdir } from "node:os"; import { randomUUID } from "node:crypto"; import type { Event, MessagePayload, HeartbeatPayload, CronPayload, HookPayload } from "./event-queue.js"; @@ -18,6 +19,8 @@ export interface EventResult { error?: string; } +export type OnStreamResult = (text: string, channelId: string) => Promise; + interface ClaudeJsonResponse { type: string; subtype?: string; @@ -51,9 +54,7 @@ export class AgentRuntime { this.hookManager = hookManager; } - async processEvent(event: Event): Promise { - // Skip inline hooks for hook events to prevent infinite recursion - // (fireInline calls processEvent which would call fireInline again) + async processEvent(event: Event, onStreamResult?: OnStreamResult): Promise { const isHookEvent = event.type === "hook"; if (!isHookEvent) { @@ -61,7 +62,7 @@ export class AgentRuntime { } try { - const result = await this.processEventCore(event); + const result = await this.processEventCore(event, onStreamResult); if (!isHookEvent) { await this.hookManager.fireInline("agent_stop", this); } @@ -74,17 +75,17 @@ export class AgentRuntime { } } - private async processEventCore(event: Event): Promise { + private async processEventCore(event: Event, onStreamResult?: OnStreamResult): Promise { const configs = await this.markdownConfigLoader.loadAll(this.config.configDir); const systemPrompt = this.systemPromptAssembler.assemble(configs); switch (event.type) { case "message": - return this.processMessage(event, systemPrompt); + return this.processMessage(event, systemPrompt, onStreamResult); case "heartbeat": - return this.processHeartbeat(event, systemPrompt); + return this.processHeartbeat(event, systemPrompt, onStreamResult); case "cron": - return this.processCron(event, systemPrompt); + return this.processCron(event, systemPrompt, onStreamResult); case "hook": return this.processHook(event, systemPrompt); default: @@ -92,14 +93,18 @@ export class AgentRuntime { } } - private async processMessage(event: Event, systemPrompt: string): Promise { + private async processMessage(event: Event, systemPrompt: string, onStreamResult?: OnStreamResult): Promise { const payload = event.payload as MessagePayload; const channelId = payload.prompt.channelId; const promptText = payload.prompt.text; const existingSessionId = this.sessionManager.getSessionId(channelId); + const streamCallback = onStreamResult + ? (text: string) => onStreamResult(text, channelId) + : undefined; + try { - const response = await this.executeClaude(promptText, systemPrompt, existingSessionId); + const response = await this.executeClaude(promptText, systemPrompt, existingSessionId, streamCallback); if (response.session_id && channelId) { this.sessionManager.setSessionId(channelId, response.session_id); @@ -118,20 +123,28 @@ export class AgentRuntime { } } - private async processHeartbeat(event: Event, systemPrompt: string): Promise { + private async processHeartbeat(event: Event, systemPrompt: string, onStreamResult?: OnStreamResult): Promise { const payload = event.payload as HeartbeatPayload; + const targetChannelId = this.config.outputChannelId; + const streamCallback = onStreamResult && targetChannelId + ? (text: string) => onStreamResult(text, targetChannelId) + : undefined; try { - const response = await this.executeClaude(payload.instruction, systemPrompt); + const response = await this.executeClaude(payload.instruction, systemPrompt, undefined, streamCallback); return { responseText: response.result, targetChannelId: this.config.outputChannelId }; } catch (error) { return { error: formatErrorForUser(error), targetChannelId: this.config.outputChannelId }; } } - private async processCron(event: Event, systemPrompt: string): Promise { + private async processCron(event: Event, systemPrompt: string, onStreamResult?: OnStreamResult): Promise { const payload = event.payload as CronPayload; + const targetChannelId = this.config.outputChannelId; + const streamCallback = onStreamResult && targetChannelId + ? (text: string) => onStreamResult(text, targetChannelId) + : undefined; try { - const response = await this.executeClaude(payload.instruction, systemPrompt); + const response = await this.executeClaude(payload.instruction, systemPrompt, undefined, streamCallback); return { responseText: response.result, targetChannelId: this.config.outputChannelId }; } catch (error) { return { error: formatErrorForUser(error), targetChannelId: this.config.outputChannelId }; @@ -153,13 +166,13 @@ export class AgentRuntime { promptText: string, systemPrompt: string, sessionId?: string, + onResult?: (text: string) => Promise, ): Promise { - // Write system prompt to a temp file to avoid CLI arg length limits const tmpFile = join(tmpdir(), `aetheel-prompt-${randomUUID()}.txt`); await writeFile(tmpFile, systemPrompt, "utf-8"); try { - return await this.runClaude(promptText, tmpFile, sessionId); + return await this.runClaude(promptText, tmpFile, sessionId, onResult); } finally { unlink(tmpFile).catch(() => {}); } @@ -169,48 +182,78 @@ export class AgentRuntime { promptText: string, systemPromptFile: string, sessionId?: string, + onResult?: (text: string) => Promise, ): Promise { return new Promise((resolve, reject) => { - // Build args — keep it minimal and match what works on the CLI directly const args: string[] = [ "-p", promptText, "--output-format", "json", "--dangerously-skip-permissions", "--append-system-prompt-file", systemPromptFile, - "--verbose", ]; if (sessionId) { args.push("--resume", sessionId); } - // --allowedTools expects each tool as a separate quoted arg for (const tool of this.config.allowedTools) { args.push("--allowedTools", tool); } args.push("--max-turns", "25"); - console.log(`[DEBUG] Spawning: ${this.config.claudeCliPath} args=${JSON.stringify(args.slice(0, 8))}... (${args.length} total)`); + const configDir = path.resolve(this.config.configDir); + console.log(`[DEBUG] Spawning: ${this.config.claudeCliPath} cwd=${configDir} args=${JSON.stringify(args.slice(0, 8))}... (${args.length} total)`); const child = spawn(this.config.claudeCliPath, args, { stdio: ["ignore", "pipe", "pipe"], + cwd: configDir, }); let stdout = ""; let stderr = ""; + let parsedSessionId: string | undefined; + let lastResultText = ""; + let streamedResults = false; + + // Parse JSON objects from stdout as they arrive for streaming + let parseBuffer = ""; child.stdout.on("data", (data: Buffer) => { - stdout += data.toString(); + const chunk = data.toString(); + stdout += chunk; + parseBuffer += chunk; + + // Try to parse complete JSON objects from the buffer + // The output is a JSON array like [{...},{...},...] or newline-delimited + const lines = parseBuffer.split("\n"); + parseBuffer = lines.pop() || ""; // Keep incomplete last line in buffer + + for (const line of lines) { + const cleaned = line.replace(/^\[/, "").replace(/,?\]$/, "").replace(/^,/, "").trim(); + if (!cleaned) continue; + try { + const obj = JSON.parse(cleaned); + if (obj.type === "system" && obj.subtype === "init" && obj.session_id) { + parsedSessionId = obj.session_id; + } + if (obj.type === "result" && obj.result) { + lastResultText = obj.result; + if (onResult) { + streamedResults = true; + onResult(obj.result).catch((err) => + console.error("[DEBUG] Stream callback error:", err) + ); + } + } + } catch { + // Not valid JSON yet + } + } }); child.stderr.on("data", (data: Buffer) => { - const chunk = data.toString(); - stderr += chunk; - // Log stderr in real-time so we can see what's happening - if (chunk.trim()) { - console.log(`[DEBUG] Claude stderr: ${chunk.trim().slice(0, 200)}`); - } + stderr += data.toString(); }); const timer = setTimeout(() => { @@ -221,76 +264,52 @@ export class AgentRuntime { child.on("close", (code) => { clearTimeout(timer); - - console.log(`[DEBUG] Claude CLI exited: code=${code}, stdout=${stdout.length} chars`); - if (stdout.length > 0) { - console.log(`[DEBUG] Claude stdout preview: ${stdout.slice(0, 300)}`); - } + console.log(`[DEBUG] Claude CLI exited: code=${code}, stdout=${stdout.length} chars, streamed=${streamedResults}`); if (code !== 0 && code !== null) { reject(new Error(`Claude CLI error (exit ${code}): ${stderr.slice(0, 500) || "unknown error"}`)); return; } - try { - // The CLI with --output-format json returns newline-delimited JSON objects. - // We need to find the "result" type message which contains the actual response. - const lines = stdout.trim().split("\n"); - let resultText = ""; - let sessionId: string | undefined; - - for (const line of lines) { - try { - // Each line might be a JSON object or part of a JSON array - const cleaned = line.replace(/^\[/, "").replace(/,?\]$/, "").replace(/^,/, "").trim(); - if (!cleaned) continue; - const obj = JSON.parse(cleaned); - - if (obj.type === "system" && obj.subtype === "init" && obj.session_id) { - sessionId = obj.session_id; - } - - if (obj.type === "result" && obj.result) { - resultText = obj.result; - } - } catch { - // Skip unparseable lines + // Final parse of any remaining buffer + if (parseBuffer.trim()) { + try { + const cleaned = parseBuffer.replace(/^\[/, "").replace(/,?\]$/, "").replace(/^,/, "").trim(); + const obj = JSON.parse(cleaned); + if (obj.type === "system" && obj.subtype === "init" && obj.session_id) { + parsedSessionId = obj.session_id; } - } + if (obj.type === "result" && obj.result) { + lastResultText = obj.result; + } + } catch { /* ignore */ } + } - // If we couldn't parse individual lines, try parsing the whole thing as a JSON array - if (!resultText) { - try { - const arr = JSON.parse(stdout.trim()); - if (Array.isArray(arr)) { - for (const obj of arr) { - if (obj.type === "system" && obj.subtype === "init" && obj.session_id) { - sessionId = obj.session_id; - } - if (obj.type === "result" && obj.result) { - resultText = obj.result; - } + // If we didn't get results from line-by-line parsing, try the full output + if (!lastResultText) { + try { + const arr = JSON.parse(stdout.trim()); + if (Array.isArray(arr)) { + for (const obj of arr) { + if (obj.type === "system" && obj.subtype === "init" && obj.session_id) { + parsedSessionId = obj.session_id; + } + if (obj.type === "result" && obj.result) { + lastResultText = obj.result; } } - } catch { - // Not a JSON array either } - } - - console.log(`[DEBUG] Parsed result: ${resultText.length} chars, session=${sessionId ?? "none"}`); - - resolve({ - type: "result", - result: resultText || undefined, - session_id: sessionId, - is_error: false, - }); - } catch { - resolve({ - type: "result", - result: stdout.trim(), - }); + } catch { /* ignore */ } } + + console.log(`[DEBUG] Parsed: result=${lastResultText.length} chars, session=${parsedSessionId ?? "none"}`); + + resolve({ + type: "result", + result: streamedResults ? undefined : lastResultText || undefined, + session_id: parsedSessionId, + is_error: false, + }); }); child.on("error", (err) => { diff --git a/src/gateway-core.ts b/src/gateway-core.ts index 4d78344..84c6285 100644 --- a/src/gateway-core.ts +++ b/src/gateway-core.ts @@ -1,3 +1,4 @@ +import { join } from "node:path"; import { loadConfig, type GatewayConfig } from "./config.js"; import { DiscordBot, type Prompt } from "./discord-bot.js"; import { EventQueue, type Event, type MessagePayload } from "./event-queue.js"; @@ -44,7 +45,7 @@ export class GatewayCore { this.eventQueue = new EventQueue(this.config.maxQueueDepth); // 5. Initialize AgentRuntime with all dependencies - this.sessionManager = new SessionManager(); + this.sessionManager = new SessionManager(join(this.config.configDir, "sessions.json")); this.markdownConfigLoader = new MarkdownConfigLoader(); const systemPromptAssembler = new SystemPromptAssembler(); this.hookManager = new HookManager(); @@ -94,9 +95,18 @@ export class GatewayCore { this.eventQueue.onEvent(async (event: Event) => { console.log(`[DEBUG] Processing event: type=${event.type}, id=${event.id}`); try { - const result = await this.agentRuntime.processEvent(event); + // Streaming callback — sends results to Discord as they arrive + const onStreamResult = async (text: string, channelId: string) => { + const chunks = splitMessage(text); + for (const chunk of chunks) { + await this.discordBot.sendMessage(channelId, chunk); + } + }; + + const result = await this.agentRuntime.processEvent(event, onStreamResult); console.log(`[DEBUG] Event result: responseText=${result.responseText?.length ?? 0} chars, error=${result.error ?? "none"}`); + // Only send if not already streamed if (result.responseText && result.targetChannelId) { const chunks = splitMessage(result.responseText); for (const chunk of chunks) { diff --git a/src/session-manager.ts b/src/session-manager.ts index 11680ff..54daf68 100644 --- a/src/session-manager.ts +++ b/src/session-manager.ts @@ -1,5 +1,16 @@ +import { readFileSync, writeFileSync, mkdirSync } from "node:fs"; +import { join, dirname } from "node:path"; + export class SessionManager { private bindings = new Map(); + private persistPath: string | null = null; + + constructor(persistPath?: string) { + if (persistPath) { + this.persistPath = persistPath; + this.loadFromDisk(); + } + } getSessionId(channelId: string): string | undefined { return this.bindings.get(channelId); @@ -7,13 +18,44 @@ export class SessionManager { setSessionId(channelId: string, sessionId: string): void { this.bindings.set(channelId, sessionId); + this.saveToDisk(); } removeSession(channelId: string): void { this.bindings.delete(channelId); + this.saveToDisk(); } clear(): void { this.bindings.clear(); + this.saveToDisk(); + } + + private loadFromDisk(): void { + if (!this.persistPath) return; + try { + const data = readFileSync(this.persistPath, "utf-8"); + const parsed = JSON.parse(data) as Record; + for (const [k, v] of Object.entries(parsed)) { + this.bindings.set(k, v); + } + console.log(`Sessions loaded: ${this.bindings.size} channel(s)`); + } catch { + // File doesn't exist yet — that's fine + } + } + + private saveToDisk(): void { + if (!this.persistPath) return; + try { + mkdirSync(dirname(this.persistPath), { recursive: true }); + const obj: Record = {}; + for (const [k, v] of this.bindings) { + obj[k] = v; + } + writeFileSync(this.persistPath, JSON.stringify(obj, null, 2), "utf-8"); + } catch (err) { + console.error("Failed to persist sessions:", err); + } } } diff --git a/vitest.config.ts b/vitest.config.ts index e2ec332..7cd1cf8 100644 --- a/vitest.config.ts +++ b/vitest.config.ts @@ -3,5 +3,6 @@ import { defineConfig } from "vitest/config"; export default defineConfig({ test: { globals: true, + exclude: ["**/node_modules/**", "**/references/**", "**/dist/**"], }, });