From 8eb80d4ed0d9e1e13f5cd2932d0c343751a00b5e Mon Sep 17 00:00:00 2001 From: gavrielc Date: Wed, 11 Feb 2026 17:25:42 +0200 Subject: [PATCH] fix: prevent infinite message replay on container timeout (#164) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Container timeout and idle timeout both fire at 30min, racing the graceful shutdown. The hard kill returns error status, rolling back the message cursor even though output was already sent — causing duplicate messages indefinitely. - Grace period: hard timeout is now IDLE_TIMEOUT + 30s minimum - Timeout after output resolves as success (idle cleanup, not failure) - Don't roll back cursor if output was already sent to user - Remove src/telegram.ts and config vars (added to PR #156 by mistake) - Add typecheck step to CI workflow - Add container-runner timeout behavior tests Co-Authored-By: Claude Opus 4.6 --- .github/workflows/test.yml | 1 + src/config.ts | 6 - src/container-runner.test.ts | 202 ++++++++++++++++++++++ src/container-runner.ts | 31 +++- src/index.ts | 8 + src/telegram.ts | 327 ----------------------------------- 6 files changed, 239 insertions(+), 336 deletions(-) create mode 100644 src/container-runner.test.ts delete mode 100644 src/telegram.ts diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 20747b6..4f25afc 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -14,4 +14,5 @@ jobs: node-version: 20 cache: npm - run: npm ci + - run: npx tsc --noEmit - run: npx vitest run diff --git a/src/config.ts b/src/config.ts index ac2ab64..721bcb6 100644 --- a/src/config.ts +++ b/src/config.ts @@ -1,12 +1,6 @@ import path from 'path'; export const ASSISTANT_NAME = process.env.ASSISTANT_NAME || 'Andy'; -export const TELEGRAM_BOT_TOKEN = process.env.TELEGRAM_BOT_TOKEN || ''; -export const TELEGRAM_ONLY = process.env.TELEGRAM_ONLY === 'true'; -export const TELEGRAM_BOT_POOL = (process.env.TELEGRAM_BOT_POOL || '') - .split(',') - .map((t) => t.trim()) - .filter(Boolean); export const POLL_INTERVAL = 2000; export const SCHEDULER_POLL_INTERVAL = 60000; diff --git a/src/container-runner.test.ts b/src/container-runner.test.ts new file mode 100644 index 0000000..99c1cc7 --- /dev/null +++ b/src/container-runner.test.ts @@ -0,0 +1,202 @@ +import { describe, it, expect, beforeEach, vi, afterEach } from 'vitest'; +import { EventEmitter } from 'events'; +import { PassThrough } from 'stream'; + +// Sentinel markers must match container-runner.ts +const OUTPUT_START_MARKER = '---NANOCLAW_OUTPUT_START---'; +const OUTPUT_END_MARKER = '---NANOCLAW_OUTPUT_END---'; + +// Mock config +vi.mock('./config.js', () => ({ + CONTAINER_IMAGE: 'nanoclaw-agent:latest', + CONTAINER_MAX_OUTPUT_SIZE: 10485760, + CONTAINER_TIMEOUT: 1800000, // 30min + DATA_DIR: '/tmp/nanoclaw-test-data', + GROUPS_DIR: '/tmp/nanoclaw-test-groups', + IDLE_TIMEOUT: 1800000, // 30min +})); + +// Mock logger +vi.mock('./logger.js', () => ({ + logger: { + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + }, +})); + +// Mock fs +vi.mock('fs', async () => { + const actual = await vi.importActual('fs'); + return { + ...actual, + default: { + ...actual, + existsSync: vi.fn(() => false), + mkdirSync: vi.fn(), + writeFileSync: vi.fn(), + readFileSync: vi.fn(() => ''), + readdirSync: vi.fn(() => []), + statSync: vi.fn(() => ({ isDirectory: () => false })), + copyFileSync: vi.fn(), + }, + }; +}); + +// Mock mount-security +vi.mock('./mount-security.js', () => ({ + validateAdditionalMounts: vi.fn(() => []), +})); + +// Create a controllable fake ChildProcess +function createFakeProcess() { + const proc = new EventEmitter() as EventEmitter & { + stdin: PassThrough; + stdout: PassThrough; + stderr: PassThrough; + kill: ReturnType; + pid: number; + }; + proc.stdin = new PassThrough(); + proc.stdout = new PassThrough(); + proc.stderr = new PassThrough(); + proc.kill = vi.fn(); + proc.pid = 12345; + return proc; +} + +let fakeProc: ReturnType; + +// Mock child_process.spawn +vi.mock('child_process', async () => { + const actual = await vi.importActual('child_process'); + return { + ...actual, + spawn: vi.fn(() => fakeProc), + exec: vi.fn((_cmd: string, _opts: unknown, cb?: (err: Error | null) => void) => { + if (cb) cb(null); + return new EventEmitter(); + }), + }; +}); + +import { runContainerAgent, ContainerOutput } from './container-runner.js'; +import type { RegisteredGroup } from './types.js'; + +const testGroup: RegisteredGroup = { + name: 'Test Group', + folder: 'test-group', + trigger: '@Andy', + added_at: new Date().toISOString(), +}; + +const testInput = { + prompt: 'Hello', + groupFolder: 'test-group', + chatJid: 'test@g.us', + isMain: false, +}; + +function emitOutputMarker(proc: ReturnType, output: ContainerOutput) { + const json = JSON.stringify(output); + proc.stdout.push(`${OUTPUT_START_MARKER}\n${json}\n${OUTPUT_END_MARKER}\n`); +} + +describe('container-runner timeout behavior', () => { + beforeEach(() => { + vi.useFakeTimers(); + fakeProc = createFakeProcess(); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it('timeout after output resolves as success', async () => { + const onOutput = vi.fn(async () => {}); + const resultPromise = runContainerAgent( + testGroup, + testInput, + () => {}, + onOutput, + ); + + // Emit output with a result + emitOutputMarker(fakeProc, { + status: 'success', + result: 'Here is my response', + newSessionId: 'session-123', + }); + + // Let output processing settle + await vi.advanceTimersByTimeAsync(10); + + // Fire the hard timeout (IDLE_TIMEOUT + 30s = 1830000ms) + await vi.advanceTimersByTimeAsync(1830000); + + // Emit close event (as if container was stopped by the timeout) + fakeProc.emit('close', 137); + + // Let the promise resolve + await vi.advanceTimersByTimeAsync(10); + + const result = await resultPromise; + expect(result.status).toBe('success'); + expect(result.newSessionId).toBe('session-123'); + expect(onOutput).toHaveBeenCalledWith( + expect.objectContaining({ result: 'Here is my response' }), + ); + }); + + it('timeout with no output resolves as error', async () => { + const onOutput = vi.fn(async () => {}); + const resultPromise = runContainerAgent( + testGroup, + testInput, + () => {}, + onOutput, + ); + + // No output emitted — fire the hard timeout + await vi.advanceTimersByTimeAsync(1830000); + + // Emit close event + fakeProc.emit('close', 137); + + await vi.advanceTimersByTimeAsync(10); + + const result = await resultPromise; + expect(result.status).toBe('error'); + expect(result.error).toContain('timed out'); + expect(onOutput).not.toHaveBeenCalled(); + }); + + it('normal exit after output resolves as success', async () => { + const onOutput = vi.fn(async () => {}); + const resultPromise = runContainerAgent( + testGroup, + testInput, + () => {}, + onOutput, + ); + + // Emit output + emitOutputMarker(fakeProc, { + status: 'success', + result: 'Done', + newSessionId: 'session-456', + }); + + await vi.advanceTimersByTimeAsync(10); + + // Normal exit (no timeout) + fakeProc.emit('close', 0); + + await vi.advanceTimersByTimeAsync(10); + + const result = await resultPromise; + expect(result.status).toBe('success'); + expect(result.newSessionId).toBe('session-456'); + }); +}); diff --git a/src/container-runner.ts b/src/container-runner.ts index b6b47f4..25c38b5 100644 --- a/src/container-runner.ts +++ b/src/container-runner.ts @@ -13,6 +13,7 @@ import { CONTAINER_TIMEOUT, DATA_DIR, GROUPS_DIR, + IDLE_TIMEOUT, } from './config.js'; import { logger } from './logger.js'; import { validateAdditionalMounts } from './mount-security.js'; @@ -324,6 +325,7 @@ export async function runContainerAgent( if (parsed.newSessionId) { newSessionId = parsed.newSessionId; } + hadStreamingOutput = true; // Activity detected — reset the hard timeout resetTimeout(); // Call onOutput for all markers (including null results) @@ -362,7 +364,11 @@ export async function runContainerAgent( }); let timedOut = false; - const timeoutMs = group.containerConfig?.timeout || CONTAINER_TIMEOUT; + let hadStreamingOutput = false; + const configTimeout = group.containerConfig?.timeout || CONTAINER_TIMEOUT; + // Grace period: hard timeout must be at least IDLE_TIMEOUT + 30s so the + // graceful _close sentinel has time to trigger before the hard kill fires. + const timeoutMs = Math.max(configTimeout, IDLE_TIMEOUT + 30_000); const killOnTimeout = () => { timedOut = true; @@ -397,17 +403,36 @@ export async function runContainerAgent( `Container: ${containerName}`, `Duration: ${duration}ms`, `Exit Code: ${code}`, + `Had Streaming Output: ${hadStreamingOutput}`, ].join('\n')); + // Timeout after output = idle cleanup, not failure. + // The agent already sent its response; this is just the + // container being reaped after the idle period expired. + if (hadStreamingOutput) { + logger.info( + { group: group.name, containerName, duration, code }, + 'Container timed out after output (idle cleanup)', + ); + outputChain.then(() => { + resolve({ + status: 'success', + result: null, + newSessionId, + }); + }); + return; + } + logger.error( { group: group.name, containerName, duration, code }, - 'Container timed out', + 'Container timed out with no output', ); resolve({ status: 'error', result: null, - error: `Container timed out after ${group.containerConfig?.timeout || CONTAINER_TIMEOUT}ms`, + error: `Container timed out after ${configTimeout}ms`, }); return; } diff --git a/src/index.ts b/src/index.ts index 5849dbb..999cf97 100644 --- a/src/index.ts +++ b/src/index.ts @@ -167,6 +167,7 @@ async function processGroupMessages(chatJid: string): Promise { await whatsapp.setTyping(chatJid, true); let hadError = false; + let outputSentToUser = false; const output = await runAgent(group, prompt, chatJid, async (result) => { // Streaming output callback — called for each agent result @@ -177,6 +178,7 @@ async function processGroupMessages(chatJid: string): Promise { logger.info({ group: group.name }, `Agent output: ${raw.slice(0, 200)}`); if (text) { await whatsapp.sendMessage(chatJid, `${ASSISTANT_NAME}: ${text}`); + outputSentToUser = true; } // Only reset idle timer on actual results, not session-update markers (result: null) resetIdleTimer(); @@ -191,6 +193,12 @@ async function processGroupMessages(chatJid: string): Promise { if (idleTimer) clearTimeout(idleTimer); if (output === 'error' || hadError) { + // If we already sent output to the user, don't roll back the cursor — + // the user got their response and re-processing would send duplicates. + if (outputSentToUser) { + logger.warn({ group: group.name }, 'Agent error after output was sent, skipping cursor rollback to prevent duplicates'); + return true; + } // Roll back cursor so retries can re-process these messages lastAgentTimestamp[chatJid] = previousCursor; saveState(); diff --git a/src/telegram.ts b/src/telegram.ts deleted file mode 100644 index fb62131..0000000 --- a/src/telegram.ts +++ /dev/null @@ -1,327 +0,0 @@ -import { Api, Bot } from 'grammy'; -import { - ASSISTANT_NAME, - TRIGGER_PATTERN, -} from './config.js'; -import { - getAllRegisteredGroups, - storeChatMetadata, - storeMessageDirect, -} from './db.js'; -import { logger } from './logger.js'; - -let bot: Bot | null = null; - -// Bot pool for agent teams: send-only Api instances (no polling) -const poolApis: Api[] = []; -// Current display name for each pool bot (from getMe at startup) -const poolBotNames: string[] = []; -// Maps "{groupFolder}:{senderName}" → pool Api index for stable assignment -const senderBotMap = new Map(); -// Tracks which pool indices are already claimed this session -const assignedIndices = new Set(); - - -/** Store a placeholder message for non-text content (photos, voice, etc.) */ -function storeNonTextMessage(ctx: any, placeholder: string): void { - const chatId = `tg:${ctx.chat.id}`; - const registeredGroups = getAllRegisteredGroups(); - if (!registeredGroups[chatId]) return; - - const timestamp = new Date(ctx.message.date * 1000).toISOString(); - const senderName = - ctx.from?.first_name || ctx.from?.username || ctx.from?.id?.toString() || 'Unknown'; - const caption = ctx.message.caption ? ` ${ctx.message.caption}` : ''; - - storeChatMetadata(chatId, timestamp); - storeMessageDirect({ - id: ctx.message.message_id.toString(), - chat_jid: chatId, - sender: ctx.from?.id?.toString() || '', - sender_name: senderName, - content: `${placeholder}${caption}`, - timestamp, - is_from_me: false, - }); -} - -export async function connectTelegram(botToken: string): Promise { - bot = new Bot(botToken); - - // Command to get chat ID (useful for registration) - bot.command('chatid', (ctx) => { - const chatId = ctx.chat.id; - const chatType = ctx.chat.type; - const chatName = - chatType === 'private' - ? ctx.from?.first_name || 'Private' - : (ctx.chat as any).title || 'Unknown'; - - ctx.reply( - `Chat ID: \`tg:${chatId}\`\nName: ${chatName}\nType: ${chatType}`, - { parse_mode: 'Markdown' }, - ); - }); - - // Command to check bot status - bot.command('ping', (ctx) => { - ctx.reply(`${ASSISTANT_NAME} is online.`); - }); - - bot.on('message:text', async (ctx) => { - // Skip commands - if (ctx.message.text.startsWith('/')) return; - - const chatId = `tg:${ctx.chat.id}`; - let content = ctx.message.text; - const timestamp = new Date(ctx.message.date * 1000).toISOString(); - const senderName = - ctx.from?.first_name || - ctx.from?.username || - ctx.from?.id.toString() || - 'Unknown'; - const sender = ctx.from?.id.toString() || ''; - const msgId = ctx.message.message_id.toString(); - - // Determine chat name - const chatName = - ctx.chat.type === 'private' - ? senderName - : (ctx.chat as any).title || chatId; - - // Translate Telegram @bot_username mentions into TRIGGER_PATTERN format. - // Telegram @mentions (e.g., @andy_ai_bot) won't match TRIGGER_PATTERN - // (e.g., ^@Andy\b), so we prepend the trigger when the bot is @mentioned. - const botUsername = ctx.me?.username?.toLowerCase(); - if (botUsername) { - const entities = ctx.message.entities || []; - const isBotMentioned = entities.some((entity) => { - if (entity.type === 'mention') { - const mentionText = content - .substring(entity.offset, entity.offset + entity.length) - .toLowerCase(); - return mentionText === `@${botUsername}`; - } - return false; - }); - if (isBotMentioned && !TRIGGER_PATTERN.test(content)) { - content = `@${ASSISTANT_NAME} ${content}`; - } - } - - // Store chat metadata for discovery - storeChatMetadata(chatId, timestamp, chatName); - - // Check if this chat is registered - const registeredGroups = getAllRegisteredGroups(); - const group = registeredGroups[chatId]; - - if (!group) { - logger.debug( - { chatId, chatName }, - 'Message from unregistered Telegram chat', - ); - return; - } - - // Store message — startMessageLoop() will pick it up - storeMessageDirect({ - id: msgId, - chat_jid: chatId, - sender, - sender_name: senderName, - content, - timestamp, - is_from_me: false, - }); - - logger.info( - { chatId, chatName, sender: senderName }, - 'Telegram message stored', - ); - }); - - // Handle non-text messages with placeholders so the agent knows something was sent - bot.on('message:photo', (ctx) => storeNonTextMessage(ctx, '[Photo]')); - bot.on('message:video', (ctx) => storeNonTextMessage(ctx, '[Video]')); - bot.on('message:voice', (ctx) => storeNonTextMessage(ctx, '[Voice message]')); - bot.on('message:audio', (ctx) => storeNonTextMessage(ctx, '[Audio]')); - bot.on('message:document', (ctx) => { - const name = ctx.message.document?.file_name || 'file'; - storeNonTextMessage(ctx, `[Document: ${name}]`); - }); - bot.on('message:sticker', (ctx) => { - const emoji = ctx.message.sticker?.emoji || ''; - storeNonTextMessage(ctx, `[Sticker ${emoji}]`); - }); - bot.on('message:location', (ctx) => storeNonTextMessage(ctx, '[Location]')); - bot.on('message:contact', (ctx) => storeNonTextMessage(ctx, '[Contact]')); - - // Handle errors gracefully - bot.catch((err) => { - logger.error({ err: err.message }, 'Telegram bot error'); - }); - - // Start polling - bot.start({ - onStart: (botInfo) => { - logger.info( - { username: botInfo.username, id: botInfo.id }, - 'Telegram bot connected', - ); - console.log(`\n Telegram bot: @${botInfo.username}`); - console.log( - ` Send /chatid to the bot to get a chat's registration ID\n`, - ); - }, - }); -} - -export async function sendTelegramMessage( - chatId: string, - text: string, -): Promise { - if (!bot) { - logger.warn('Telegram bot not initialized'); - return; - } - - try { - const numericId = chatId.replace(/^tg:/, ''); - - // Telegram has a 4096 character limit per message — split if needed - const MAX_LENGTH = 4096; - if (text.length <= MAX_LENGTH) { - await bot.api.sendMessage(numericId, text); - } else { - for (let i = 0; i < text.length; i += MAX_LENGTH) { - await bot.api.sendMessage(numericId, text.slice(i, i + MAX_LENGTH)); - } - } - logger.info({ chatId, length: text.length }, 'Telegram message sent'); - } catch (err) { - logger.error({ chatId, err }, 'Failed to send Telegram message'); - } -} - -export async function setTelegramTyping(chatId: string): Promise { - if (!bot) return; - try { - const numericId = chatId.replace(/^tg:/, ''); - await bot.api.sendChatAction(numericId, 'typing'); - } catch (err) { - logger.debug({ chatId, err }, 'Failed to send Telegram typing indicator'); - } -} - -/** - * Initialize send-only Api instances for the bot pool. - * Each pool bot can send messages but doesn't poll for updates. - */ -export async function initBotPool(tokens: string[]): Promise { - for (const token of tokens) { - try { - const api = new Api(token); - const me = await api.getMe(); - poolApis.push(api); - poolBotNames.push(me.first_name); - logger.info( - { username: me.username, name: me.first_name, id: me.id, poolSize: poolApis.length }, - 'Pool bot initialized', - ); - } catch (err) { - logger.error({ err }, 'Failed to initialize pool bot'); - } - } - if (poolApis.length > 0) { - logger.info({ count: poolApis.length, names: poolBotNames }, 'Telegram bot pool ready'); - } -} - -/** - * Send a message via a pool bot assigned to the given sender name. - * Assignment priority: - * 1. Already assigned to this sender this session → reuse - * 2. A pool bot whose current name matches the sender → claim it (no rename needed) - * 3. First unassigned pool bot → claim and rename - * 4. All claimed → wrap around (reuse + rename) - */ -export async function sendPoolMessage( - chatId: string, - text: string, - sender: string, - groupFolder: string, -): Promise { - if (poolApis.length === 0) { - // No pool bots — fall back to main bot - await sendTelegramMessage(chatId, text); - return; - } - - const key = `${groupFolder}:${sender}`; - let idx = senderBotMap.get(key); - if (idx === undefined) { - // 1. Check if any pool bot already has this name (from a previous session) - const nameMatch = poolBotNames.findIndex( - (name, i) => name === sender && !assignedIndices.has(i), - ); - if (nameMatch !== -1) { - idx = nameMatch; - assignedIndices.add(idx); - senderBotMap.set(key, idx); - logger.info({ sender, groupFolder, poolIndex: idx }, 'Matched pool bot by name'); - } else { - // 2. Pick first unassigned bot - let freeIdx = -1; - for (let i = 0; i < poolApis.length; i++) { - if (!assignedIndices.has(i)) { - freeIdx = i; - break; - } - } - // 3. All assigned — wrap around to least recently used - if (freeIdx === -1) freeIdx = assignedIndices.size % poolApis.length; - - idx = freeIdx; - assignedIndices.add(idx); - senderBotMap.set(key, idx); - // Rename the bot, then wait for Telegram to propagate - try { - await poolApis[idx].setMyName(sender); - poolBotNames[idx] = sender; - await new Promise((r) => setTimeout(r, 2000)); - logger.info({ sender, groupFolder, poolIndex: idx }, 'Assigned and renamed pool bot'); - } catch (err) { - logger.warn({ sender, err }, 'Failed to rename pool bot (sending anyway)'); - } - } - } - - const api = poolApis[idx]; - try { - const numericId = chatId.replace(/^tg:/, ''); - const MAX_LENGTH = 4096; - if (text.length <= MAX_LENGTH) { - await api.sendMessage(numericId, text); - } else { - for (let i = 0; i < text.length; i += MAX_LENGTH) { - await api.sendMessage(numericId, text.slice(i, i + MAX_LENGTH)); - } - } - logger.info({ chatId, sender, poolIndex: idx, length: text.length }, 'Pool message sent'); - } catch (err) { - logger.error({ chatId, sender, err }, 'Failed to send pool message'); - } -} - -export function isTelegramConnected(): boolean { - return bot !== null; -} - -export function stopTelegram(): void { - if (bot) { - bot.stop(); - bot = null; - logger.info('Telegram bot stopped'); - } -}