diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 20747b6..4f25afc 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -14,4 +14,5 @@ jobs: node-version: 20 cache: npm - run: npm ci + - run: npx tsc --noEmit - run: npx vitest run diff --git a/src/config.ts b/src/config.ts index ac2ab64..721bcb6 100644 --- a/src/config.ts +++ b/src/config.ts @@ -1,12 +1,6 @@ import path from 'path'; export const ASSISTANT_NAME = process.env.ASSISTANT_NAME || 'Andy'; -export const TELEGRAM_BOT_TOKEN = process.env.TELEGRAM_BOT_TOKEN || ''; -export const TELEGRAM_ONLY = process.env.TELEGRAM_ONLY === 'true'; -export const TELEGRAM_BOT_POOL = (process.env.TELEGRAM_BOT_POOL || '') - .split(',') - .map((t) => t.trim()) - .filter(Boolean); export const POLL_INTERVAL = 2000; export const SCHEDULER_POLL_INTERVAL = 60000; diff --git a/src/container-runner.test.ts b/src/container-runner.test.ts new file mode 100644 index 0000000..99c1cc7 --- /dev/null +++ b/src/container-runner.test.ts @@ -0,0 +1,202 @@ +import { describe, it, expect, beforeEach, vi, afterEach } from 'vitest'; +import { EventEmitter } from 'events'; +import { PassThrough } from 'stream'; + +// Sentinel markers must match container-runner.ts +const OUTPUT_START_MARKER = '---NANOCLAW_OUTPUT_START---'; +const OUTPUT_END_MARKER = '---NANOCLAW_OUTPUT_END---'; + +// Mock config +vi.mock('./config.js', () => ({ + CONTAINER_IMAGE: 'nanoclaw-agent:latest', + CONTAINER_MAX_OUTPUT_SIZE: 10485760, + CONTAINER_TIMEOUT: 1800000, // 30min + DATA_DIR: '/tmp/nanoclaw-test-data', + GROUPS_DIR: '/tmp/nanoclaw-test-groups', + IDLE_TIMEOUT: 1800000, // 30min +})); + +// Mock logger +vi.mock('./logger.js', () => ({ + logger: { + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + }, +})); + +// Mock fs +vi.mock('fs', async () => { + const actual = await vi.importActual('fs'); + return { + ...actual, + default: { + ...actual, + existsSync: vi.fn(() => false), + mkdirSync: vi.fn(), + writeFileSync: vi.fn(), + readFileSync: vi.fn(() => ''), + readdirSync: vi.fn(() => []), + statSync: vi.fn(() => ({ isDirectory: () => false })), + copyFileSync: vi.fn(), + }, + }; +}); + +// Mock mount-security +vi.mock('./mount-security.js', () => ({ + validateAdditionalMounts: vi.fn(() => []), +})); + +// Create a controllable fake ChildProcess +function createFakeProcess() { + const proc = new EventEmitter() as EventEmitter & { + stdin: PassThrough; + stdout: PassThrough; + stderr: PassThrough; + kill: ReturnType; + pid: number; + }; + proc.stdin = new PassThrough(); + proc.stdout = new PassThrough(); + proc.stderr = new PassThrough(); + proc.kill = vi.fn(); + proc.pid = 12345; + return proc; +} + +let fakeProc: ReturnType; + +// Mock child_process.spawn +vi.mock('child_process', async () => { + const actual = await vi.importActual('child_process'); + return { + ...actual, + spawn: vi.fn(() => fakeProc), + exec: vi.fn((_cmd: string, _opts: unknown, cb?: (err: Error | null) => void) => { + if (cb) cb(null); + return new EventEmitter(); + }), + }; +}); + +import { runContainerAgent, ContainerOutput } from './container-runner.js'; +import type { RegisteredGroup } from './types.js'; + +const testGroup: RegisteredGroup = { + name: 'Test Group', + folder: 'test-group', + trigger: '@Andy', + added_at: new Date().toISOString(), +}; + +const testInput = { + prompt: 'Hello', + groupFolder: 'test-group', + chatJid: 'test@g.us', + isMain: false, +}; + +function emitOutputMarker(proc: ReturnType, output: ContainerOutput) { + const json = JSON.stringify(output); + proc.stdout.push(`${OUTPUT_START_MARKER}\n${json}\n${OUTPUT_END_MARKER}\n`); +} + +describe('container-runner timeout behavior', () => { + beforeEach(() => { + vi.useFakeTimers(); + fakeProc = createFakeProcess(); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it('timeout after output resolves as success', async () => { + const onOutput = vi.fn(async () => {}); + const resultPromise = runContainerAgent( + testGroup, + testInput, + () => {}, + onOutput, + ); + + // Emit output with a result + emitOutputMarker(fakeProc, { + status: 'success', + result: 'Here is my response', + newSessionId: 'session-123', + }); + + // Let output processing settle + await vi.advanceTimersByTimeAsync(10); + + // Fire the hard timeout (IDLE_TIMEOUT + 30s = 1830000ms) + await vi.advanceTimersByTimeAsync(1830000); + + // Emit close event (as if container was stopped by the timeout) + fakeProc.emit('close', 137); + + // Let the promise resolve + await vi.advanceTimersByTimeAsync(10); + + const result = await resultPromise; + expect(result.status).toBe('success'); + expect(result.newSessionId).toBe('session-123'); + expect(onOutput).toHaveBeenCalledWith( + expect.objectContaining({ result: 'Here is my response' }), + ); + }); + + it('timeout with no output resolves as error', async () => { + const onOutput = vi.fn(async () => {}); + const resultPromise = runContainerAgent( + testGroup, + testInput, + () => {}, + onOutput, + ); + + // No output emitted — fire the hard timeout + await vi.advanceTimersByTimeAsync(1830000); + + // Emit close event + fakeProc.emit('close', 137); + + await vi.advanceTimersByTimeAsync(10); + + const result = await resultPromise; + expect(result.status).toBe('error'); + expect(result.error).toContain('timed out'); + expect(onOutput).not.toHaveBeenCalled(); + }); + + it('normal exit after output resolves as success', async () => { + const onOutput = vi.fn(async () => {}); + const resultPromise = runContainerAgent( + testGroup, + testInput, + () => {}, + onOutput, + ); + + // Emit output + emitOutputMarker(fakeProc, { + status: 'success', + result: 'Done', + newSessionId: 'session-456', + }); + + await vi.advanceTimersByTimeAsync(10); + + // Normal exit (no timeout) + fakeProc.emit('close', 0); + + await vi.advanceTimersByTimeAsync(10); + + const result = await resultPromise; + expect(result.status).toBe('success'); + expect(result.newSessionId).toBe('session-456'); + }); +}); diff --git a/src/container-runner.ts b/src/container-runner.ts index b6b47f4..25c38b5 100644 --- a/src/container-runner.ts +++ b/src/container-runner.ts @@ -13,6 +13,7 @@ import { CONTAINER_TIMEOUT, DATA_DIR, GROUPS_DIR, + IDLE_TIMEOUT, } from './config.js'; import { logger } from './logger.js'; import { validateAdditionalMounts } from './mount-security.js'; @@ -324,6 +325,7 @@ export async function runContainerAgent( if (parsed.newSessionId) { newSessionId = parsed.newSessionId; } + hadStreamingOutput = true; // Activity detected — reset the hard timeout resetTimeout(); // Call onOutput for all markers (including null results) @@ -362,7 +364,11 @@ export async function runContainerAgent( }); let timedOut = false; - const timeoutMs = group.containerConfig?.timeout || CONTAINER_TIMEOUT; + let hadStreamingOutput = false; + const configTimeout = group.containerConfig?.timeout || CONTAINER_TIMEOUT; + // Grace period: hard timeout must be at least IDLE_TIMEOUT + 30s so the + // graceful _close sentinel has time to trigger before the hard kill fires. + const timeoutMs = Math.max(configTimeout, IDLE_TIMEOUT + 30_000); const killOnTimeout = () => { timedOut = true; @@ -397,17 +403,36 @@ export async function runContainerAgent( `Container: ${containerName}`, `Duration: ${duration}ms`, `Exit Code: ${code}`, + `Had Streaming Output: ${hadStreamingOutput}`, ].join('\n')); + // Timeout after output = idle cleanup, not failure. + // The agent already sent its response; this is just the + // container being reaped after the idle period expired. + if (hadStreamingOutput) { + logger.info( + { group: group.name, containerName, duration, code }, + 'Container timed out after output (idle cleanup)', + ); + outputChain.then(() => { + resolve({ + status: 'success', + result: null, + newSessionId, + }); + }); + return; + } + logger.error( { group: group.name, containerName, duration, code }, - 'Container timed out', + 'Container timed out with no output', ); resolve({ status: 'error', result: null, - error: `Container timed out after ${group.containerConfig?.timeout || CONTAINER_TIMEOUT}ms`, + error: `Container timed out after ${configTimeout}ms`, }); return; } diff --git a/src/index.ts b/src/index.ts index 5849dbb..999cf97 100644 --- a/src/index.ts +++ b/src/index.ts @@ -167,6 +167,7 @@ async function processGroupMessages(chatJid: string): Promise { await whatsapp.setTyping(chatJid, true); let hadError = false; + let outputSentToUser = false; const output = await runAgent(group, prompt, chatJid, async (result) => { // Streaming output callback — called for each agent result @@ -177,6 +178,7 @@ async function processGroupMessages(chatJid: string): Promise { logger.info({ group: group.name }, `Agent output: ${raw.slice(0, 200)}`); if (text) { await whatsapp.sendMessage(chatJid, `${ASSISTANT_NAME}: ${text}`); + outputSentToUser = true; } // Only reset idle timer on actual results, not session-update markers (result: null) resetIdleTimer(); @@ -191,6 +193,12 @@ async function processGroupMessages(chatJid: string): Promise { if (idleTimer) clearTimeout(idleTimer); if (output === 'error' || hadError) { + // If we already sent output to the user, don't roll back the cursor — + // the user got their response and re-processing would send duplicates. + if (outputSentToUser) { + logger.warn({ group: group.name }, 'Agent error after output was sent, skipping cursor rollback to prevent duplicates'); + return true; + } // Roll back cursor so retries can re-process these messages lastAgentTimestamp[chatJid] = previousCursor; saveState(); diff --git a/src/telegram.ts b/src/telegram.ts deleted file mode 100644 index fb62131..0000000 --- a/src/telegram.ts +++ /dev/null @@ -1,327 +0,0 @@ -import { Api, Bot } from 'grammy'; -import { - ASSISTANT_NAME, - TRIGGER_PATTERN, -} from './config.js'; -import { - getAllRegisteredGroups, - storeChatMetadata, - storeMessageDirect, -} from './db.js'; -import { logger } from './logger.js'; - -let bot: Bot | null = null; - -// Bot pool for agent teams: send-only Api instances (no polling) -const poolApis: Api[] = []; -// Current display name for each pool bot (from getMe at startup) -const poolBotNames: string[] = []; -// Maps "{groupFolder}:{senderName}" → pool Api index for stable assignment -const senderBotMap = new Map(); -// Tracks which pool indices are already claimed this session -const assignedIndices = new Set(); - - -/** Store a placeholder message for non-text content (photos, voice, etc.) */ -function storeNonTextMessage(ctx: any, placeholder: string): void { - const chatId = `tg:${ctx.chat.id}`; - const registeredGroups = getAllRegisteredGroups(); - if (!registeredGroups[chatId]) return; - - const timestamp = new Date(ctx.message.date * 1000).toISOString(); - const senderName = - ctx.from?.first_name || ctx.from?.username || ctx.from?.id?.toString() || 'Unknown'; - const caption = ctx.message.caption ? ` ${ctx.message.caption}` : ''; - - storeChatMetadata(chatId, timestamp); - storeMessageDirect({ - id: ctx.message.message_id.toString(), - chat_jid: chatId, - sender: ctx.from?.id?.toString() || '', - sender_name: senderName, - content: `${placeholder}${caption}`, - timestamp, - is_from_me: false, - }); -} - -export async function connectTelegram(botToken: string): Promise { - bot = new Bot(botToken); - - // Command to get chat ID (useful for registration) - bot.command('chatid', (ctx) => { - const chatId = ctx.chat.id; - const chatType = ctx.chat.type; - const chatName = - chatType === 'private' - ? ctx.from?.first_name || 'Private' - : (ctx.chat as any).title || 'Unknown'; - - ctx.reply( - `Chat ID: \`tg:${chatId}\`\nName: ${chatName}\nType: ${chatType}`, - { parse_mode: 'Markdown' }, - ); - }); - - // Command to check bot status - bot.command('ping', (ctx) => { - ctx.reply(`${ASSISTANT_NAME} is online.`); - }); - - bot.on('message:text', async (ctx) => { - // Skip commands - if (ctx.message.text.startsWith('/')) return; - - const chatId = `tg:${ctx.chat.id}`; - let content = ctx.message.text; - const timestamp = new Date(ctx.message.date * 1000).toISOString(); - const senderName = - ctx.from?.first_name || - ctx.from?.username || - ctx.from?.id.toString() || - 'Unknown'; - const sender = ctx.from?.id.toString() || ''; - const msgId = ctx.message.message_id.toString(); - - // Determine chat name - const chatName = - ctx.chat.type === 'private' - ? senderName - : (ctx.chat as any).title || chatId; - - // Translate Telegram @bot_username mentions into TRIGGER_PATTERN format. - // Telegram @mentions (e.g., @andy_ai_bot) won't match TRIGGER_PATTERN - // (e.g., ^@Andy\b), so we prepend the trigger when the bot is @mentioned. - const botUsername = ctx.me?.username?.toLowerCase(); - if (botUsername) { - const entities = ctx.message.entities || []; - const isBotMentioned = entities.some((entity) => { - if (entity.type === 'mention') { - const mentionText = content - .substring(entity.offset, entity.offset + entity.length) - .toLowerCase(); - return mentionText === `@${botUsername}`; - } - return false; - }); - if (isBotMentioned && !TRIGGER_PATTERN.test(content)) { - content = `@${ASSISTANT_NAME} ${content}`; - } - } - - // Store chat metadata for discovery - storeChatMetadata(chatId, timestamp, chatName); - - // Check if this chat is registered - const registeredGroups = getAllRegisteredGroups(); - const group = registeredGroups[chatId]; - - if (!group) { - logger.debug( - { chatId, chatName }, - 'Message from unregistered Telegram chat', - ); - return; - } - - // Store message — startMessageLoop() will pick it up - storeMessageDirect({ - id: msgId, - chat_jid: chatId, - sender, - sender_name: senderName, - content, - timestamp, - is_from_me: false, - }); - - logger.info( - { chatId, chatName, sender: senderName }, - 'Telegram message stored', - ); - }); - - // Handle non-text messages with placeholders so the agent knows something was sent - bot.on('message:photo', (ctx) => storeNonTextMessage(ctx, '[Photo]')); - bot.on('message:video', (ctx) => storeNonTextMessage(ctx, '[Video]')); - bot.on('message:voice', (ctx) => storeNonTextMessage(ctx, '[Voice message]')); - bot.on('message:audio', (ctx) => storeNonTextMessage(ctx, '[Audio]')); - bot.on('message:document', (ctx) => { - const name = ctx.message.document?.file_name || 'file'; - storeNonTextMessage(ctx, `[Document: ${name}]`); - }); - bot.on('message:sticker', (ctx) => { - const emoji = ctx.message.sticker?.emoji || ''; - storeNonTextMessage(ctx, `[Sticker ${emoji}]`); - }); - bot.on('message:location', (ctx) => storeNonTextMessage(ctx, '[Location]')); - bot.on('message:contact', (ctx) => storeNonTextMessage(ctx, '[Contact]')); - - // Handle errors gracefully - bot.catch((err) => { - logger.error({ err: err.message }, 'Telegram bot error'); - }); - - // Start polling - bot.start({ - onStart: (botInfo) => { - logger.info( - { username: botInfo.username, id: botInfo.id }, - 'Telegram bot connected', - ); - console.log(`\n Telegram bot: @${botInfo.username}`); - console.log( - ` Send /chatid to the bot to get a chat's registration ID\n`, - ); - }, - }); -} - -export async function sendTelegramMessage( - chatId: string, - text: string, -): Promise { - if (!bot) { - logger.warn('Telegram bot not initialized'); - return; - } - - try { - const numericId = chatId.replace(/^tg:/, ''); - - // Telegram has a 4096 character limit per message — split if needed - const MAX_LENGTH = 4096; - if (text.length <= MAX_LENGTH) { - await bot.api.sendMessage(numericId, text); - } else { - for (let i = 0; i < text.length; i += MAX_LENGTH) { - await bot.api.sendMessage(numericId, text.slice(i, i + MAX_LENGTH)); - } - } - logger.info({ chatId, length: text.length }, 'Telegram message sent'); - } catch (err) { - logger.error({ chatId, err }, 'Failed to send Telegram message'); - } -} - -export async function setTelegramTyping(chatId: string): Promise { - if (!bot) return; - try { - const numericId = chatId.replace(/^tg:/, ''); - await bot.api.sendChatAction(numericId, 'typing'); - } catch (err) { - logger.debug({ chatId, err }, 'Failed to send Telegram typing indicator'); - } -} - -/** - * Initialize send-only Api instances for the bot pool. - * Each pool bot can send messages but doesn't poll for updates. - */ -export async function initBotPool(tokens: string[]): Promise { - for (const token of tokens) { - try { - const api = new Api(token); - const me = await api.getMe(); - poolApis.push(api); - poolBotNames.push(me.first_name); - logger.info( - { username: me.username, name: me.first_name, id: me.id, poolSize: poolApis.length }, - 'Pool bot initialized', - ); - } catch (err) { - logger.error({ err }, 'Failed to initialize pool bot'); - } - } - if (poolApis.length > 0) { - logger.info({ count: poolApis.length, names: poolBotNames }, 'Telegram bot pool ready'); - } -} - -/** - * Send a message via a pool bot assigned to the given sender name. - * Assignment priority: - * 1. Already assigned to this sender this session → reuse - * 2. A pool bot whose current name matches the sender → claim it (no rename needed) - * 3. First unassigned pool bot → claim and rename - * 4. All claimed → wrap around (reuse + rename) - */ -export async function sendPoolMessage( - chatId: string, - text: string, - sender: string, - groupFolder: string, -): Promise { - if (poolApis.length === 0) { - // No pool bots — fall back to main bot - await sendTelegramMessage(chatId, text); - return; - } - - const key = `${groupFolder}:${sender}`; - let idx = senderBotMap.get(key); - if (idx === undefined) { - // 1. Check if any pool bot already has this name (from a previous session) - const nameMatch = poolBotNames.findIndex( - (name, i) => name === sender && !assignedIndices.has(i), - ); - if (nameMatch !== -1) { - idx = nameMatch; - assignedIndices.add(idx); - senderBotMap.set(key, idx); - logger.info({ sender, groupFolder, poolIndex: idx }, 'Matched pool bot by name'); - } else { - // 2. Pick first unassigned bot - let freeIdx = -1; - for (let i = 0; i < poolApis.length; i++) { - if (!assignedIndices.has(i)) { - freeIdx = i; - break; - } - } - // 3. All assigned — wrap around to least recently used - if (freeIdx === -1) freeIdx = assignedIndices.size % poolApis.length; - - idx = freeIdx; - assignedIndices.add(idx); - senderBotMap.set(key, idx); - // Rename the bot, then wait for Telegram to propagate - try { - await poolApis[idx].setMyName(sender); - poolBotNames[idx] = sender; - await new Promise((r) => setTimeout(r, 2000)); - logger.info({ sender, groupFolder, poolIndex: idx }, 'Assigned and renamed pool bot'); - } catch (err) { - logger.warn({ sender, err }, 'Failed to rename pool bot (sending anyway)'); - } - } - } - - const api = poolApis[idx]; - try { - const numericId = chatId.replace(/^tg:/, ''); - const MAX_LENGTH = 4096; - if (text.length <= MAX_LENGTH) { - await api.sendMessage(numericId, text); - } else { - for (let i = 0; i < text.length; i += MAX_LENGTH) { - await api.sendMessage(numericId, text.slice(i, i + MAX_LENGTH)); - } - } - logger.info({ chatId, sender, poolIndex: idx, length: text.length }, 'Pool message sent'); - } catch (err) { - logger.error({ chatId, sender, err }, 'Failed to send pool message'); - } -} - -export function isTelegramConnected(): boolean { - return bot !== null; -} - -export function stopTelegram(): void { - if (bot) { - bot.stop(); - bot = null; - logger.info('Telegram bot stopped'); - } -}