fix: prevent infinite message replay on container timeout (#164)

Container timeout and idle timeout both fire at 30min, racing the
graceful shutdown. The hard kill returns error status, rolling back
the message cursor even though output was already sent — causing
duplicate messages indefinitely.

- Grace period: hard timeout is now IDLE_TIMEOUT + 30s minimum
- Timeout after output resolves as success (idle cleanup, not failure)
- Don't roll back cursor if output was already sent to user
- Remove src/telegram.ts and config vars (added to PR #156 by mistake)
- Add typecheck step to CI workflow
- Add container-runner timeout behavior tests

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
gavrielc
2026-02-11 17:25:42 +02:00
parent 2b56fecfdc
commit 8eb80d4ed0
6 changed files with 239 additions and 336 deletions

View File

@@ -14,4 +14,5 @@ jobs:
node-version: 20 node-version: 20
cache: npm cache: npm
- run: npm ci - run: npm ci
- run: npx tsc --noEmit
- run: npx vitest run - run: npx vitest run

View File

@@ -1,12 +1,6 @@
import path from 'path'; import path from 'path';
export const ASSISTANT_NAME = process.env.ASSISTANT_NAME || 'Andy'; export const ASSISTANT_NAME = process.env.ASSISTANT_NAME || 'Andy';
export const TELEGRAM_BOT_TOKEN = process.env.TELEGRAM_BOT_TOKEN || '';
export const TELEGRAM_ONLY = process.env.TELEGRAM_ONLY === 'true';
export const TELEGRAM_BOT_POOL = (process.env.TELEGRAM_BOT_POOL || '')
.split(',')
.map((t) => t.trim())
.filter(Boolean);
export const POLL_INTERVAL = 2000; export const POLL_INTERVAL = 2000;
export const SCHEDULER_POLL_INTERVAL = 60000; export const SCHEDULER_POLL_INTERVAL = 60000;

View File

@@ -0,0 +1,202 @@
import { describe, it, expect, beforeEach, vi, afterEach } from 'vitest';
import { EventEmitter } from 'events';
import { PassThrough } from 'stream';
// Sentinel markers must match container-runner.ts
const OUTPUT_START_MARKER = '---NANOCLAW_OUTPUT_START---';
const OUTPUT_END_MARKER = '---NANOCLAW_OUTPUT_END---';
// Mock config
vi.mock('./config.js', () => ({
CONTAINER_IMAGE: 'nanoclaw-agent:latest',
CONTAINER_MAX_OUTPUT_SIZE: 10485760,
CONTAINER_TIMEOUT: 1800000, // 30min
DATA_DIR: '/tmp/nanoclaw-test-data',
GROUPS_DIR: '/tmp/nanoclaw-test-groups',
IDLE_TIMEOUT: 1800000, // 30min
}));
// Mock logger
vi.mock('./logger.js', () => ({
logger: {
debug: vi.fn(),
info: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
},
}));
// Mock fs
vi.mock('fs', async () => {
const actual = await vi.importActual<typeof import('fs')>('fs');
return {
...actual,
default: {
...actual,
existsSync: vi.fn(() => false),
mkdirSync: vi.fn(),
writeFileSync: vi.fn(),
readFileSync: vi.fn(() => ''),
readdirSync: vi.fn(() => []),
statSync: vi.fn(() => ({ isDirectory: () => false })),
copyFileSync: vi.fn(),
},
};
});
// Mock mount-security
vi.mock('./mount-security.js', () => ({
validateAdditionalMounts: vi.fn(() => []),
}));
// Create a controllable fake ChildProcess
function createFakeProcess() {
const proc = new EventEmitter() as EventEmitter & {
stdin: PassThrough;
stdout: PassThrough;
stderr: PassThrough;
kill: ReturnType<typeof vi.fn>;
pid: number;
};
proc.stdin = new PassThrough();
proc.stdout = new PassThrough();
proc.stderr = new PassThrough();
proc.kill = vi.fn();
proc.pid = 12345;
return proc;
}
let fakeProc: ReturnType<typeof createFakeProcess>;
// Mock child_process.spawn
vi.mock('child_process', async () => {
const actual = await vi.importActual<typeof import('child_process')>('child_process');
return {
...actual,
spawn: vi.fn(() => fakeProc),
exec: vi.fn((_cmd: string, _opts: unknown, cb?: (err: Error | null) => void) => {
if (cb) cb(null);
return new EventEmitter();
}),
};
});
import { runContainerAgent, ContainerOutput } from './container-runner.js';
import type { RegisteredGroup } from './types.js';
const testGroup: RegisteredGroup = {
name: 'Test Group',
folder: 'test-group',
trigger: '@Andy',
added_at: new Date().toISOString(),
};
const testInput = {
prompt: 'Hello',
groupFolder: 'test-group',
chatJid: 'test@g.us',
isMain: false,
};
function emitOutputMarker(proc: ReturnType<typeof createFakeProcess>, output: ContainerOutput) {
const json = JSON.stringify(output);
proc.stdout.push(`${OUTPUT_START_MARKER}\n${json}\n${OUTPUT_END_MARKER}\n`);
}
describe('container-runner timeout behavior', () => {
beforeEach(() => {
vi.useFakeTimers();
fakeProc = createFakeProcess();
});
afterEach(() => {
vi.useRealTimers();
});
it('timeout after output resolves as success', async () => {
const onOutput = vi.fn(async () => {});
const resultPromise = runContainerAgent(
testGroup,
testInput,
() => {},
onOutput,
);
// Emit output with a result
emitOutputMarker(fakeProc, {
status: 'success',
result: 'Here is my response',
newSessionId: 'session-123',
});
// Let output processing settle
await vi.advanceTimersByTimeAsync(10);
// Fire the hard timeout (IDLE_TIMEOUT + 30s = 1830000ms)
await vi.advanceTimersByTimeAsync(1830000);
// Emit close event (as if container was stopped by the timeout)
fakeProc.emit('close', 137);
// Let the promise resolve
await vi.advanceTimersByTimeAsync(10);
const result = await resultPromise;
expect(result.status).toBe('success');
expect(result.newSessionId).toBe('session-123');
expect(onOutput).toHaveBeenCalledWith(
expect.objectContaining({ result: 'Here is my response' }),
);
});
it('timeout with no output resolves as error', async () => {
const onOutput = vi.fn(async () => {});
const resultPromise = runContainerAgent(
testGroup,
testInput,
() => {},
onOutput,
);
// No output emitted — fire the hard timeout
await vi.advanceTimersByTimeAsync(1830000);
// Emit close event
fakeProc.emit('close', 137);
await vi.advanceTimersByTimeAsync(10);
const result = await resultPromise;
expect(result.status).toBe('error');
expect(result.error).toContain('timed out');
expect(onOutput).not.toHaveBeenCalled();
});
it('normal exit after output resolves as success', async () => {
const onOutput = vi.fn(async () => {});
const resultPromise = runContainerAgent(
testGroup,
testInput,
() => {},
onOutput,
);
// Emit output
emitOutputMarker(fakeProc, {
status: 'success',
result: 'Done',
newSessionId: 'session-456',
});
await vi.advanceTimersByTimeAsync(10);
// Normal exit (no timeout)
fakeProc.emit('close', 0);
await vi.advanceTimersByTimeAsync(10);
const result = await resultPromise;
expect(result.status).toBe('success');
expect(result.newSessionId).toBe('session-456');
});
});

View File

@@ -13,6 +13,7 @@ import {
CONTAINER_TIMEOUT, CONTAINER_TIMEOUT,
DATA_DIR, DATA_DIR,
GROUPS_DIR, GROUPS_DIR,
IDLE_TIMEOUT,
} from './config.js'; } from './config.js';
import { logger } from './logger.js'; import { logger } from './logger.js';
import { validateAdditionalMounts } from './mount-security.js'; import { validateAdditionalMounts } from './mount-security.js';
@@ -324,6 +325,7 @@ export async function runContainerAgent(
if (parsed.newSessionId) { if (parsed.newSessionId) {
newSessionId = parsed.newSessionId; newSessionId = parsed.newSessionId;
} }
hadStreamingOutput = true;
// Activity detected — reset the hard timeout // Activity detected — reset the hard timeout
resetTimeout(); resetTimeout();
// Call onOutput for all markers (including null results) // Call onOutput for all markers (including null results)
@@ -362,7 +364,11 @@ export async function runContainerAgent(
}); });
let timedOut = false; let timedOut = false;
const timeoutMs = group.containerConfig?.timeout || CONTAINER_TIMEOUT; let hadStreamingOutput = false;
const configTimeout = group.containerConfig?.timeout || CONTAINER_TIMEOUT;
// Grace period: hard timeout must be at least IDLE_TIMEOUT + 30s so the
// graceful _close sentinel has time to trigger before the hard kill fires.
const timeoutMs = Math.max(configTimeout, IDLE_TIMEOUT + 30_000);
const killOnTimeout = () => { const killOnTimeout = () => {
timedOut = true; timedOut = true;
@@ -397,17 +403,36 @@ export async function runContainerAgent(
`Container: ${containerName}`, `Container: ${containerName}`,
`Duration: ${duration}ms`, `Duration: ${duration}ms`,
`Exit Code: ${code}`, `Exit Code: ${code}`,
`Had Streaming Output: ${hadStreamingOutput}`,
].join('\n')); ].join('\n'));
// Timeout after output = idle cleanup, not failure.
// The agent already sent its response; this is just the
// container being reaped after the idle period expired.
if (hadStreamingOutput) {
logger.info(
{ group: group.name, containerName, duration, code },
'Container timed out after output (idle cleanup)',
);
outputChain.then(() => {
resolve({
status: 'success',
result: null,
newSessionId,
});
});
return;
}
logger.error( logger.error(
{ group: group.name, containerName, duration, code }, { group: group.name, containerName, duration, code },
'Container timed out', 'Container timed out with no output',
); );
resolve({ resolve({
status: 'error', status: 'error',
result: null, result: null,
error: `Container timed out after ${group.containerConfig?.timeout || CONTAINER_TIMEOUT}ms`, error: `Container timed out after ${configTimeout}ms`,
}); });
return; return;
} }

View File

@@ -167,6 +167,7 @@ async function processGroupMessages(chatJid: string): Promise<boolean> {
await whatsapp.setTyping(chatJid, true); await whatsapp.setTyping(chatJid, true);
let hadError = false; let hadError = false;
let outputSentToUser = false;
const output = await runAgent(group, prompt, chatJid, async (result) => { const output = await runAgent(group, prompt, chatJid, async (result) => {
// Streaming output callback — called for each agent result // Streaming output callback — called for each agent result
@@ -177,6 +178,7 @@ async function processGroupMessages(chatJid: string): Promise<boolean> {
logger.info({ group: group.name }, `Agent output: ${raw.slice(0, 200)}`); logger.info({ group: group.name }, `Agent output: ${raw.slice(0, 200)}`);
if (text) { if (text) {
await whatsapp.sendMessage(chatJid, `${ASSISTANT_NAME}: ${text}`); await whatsapp.sendMessage(chatJid, `${ASSISTANT_NAME}: ${text}`);
outputSentToUser = true;
} }
// Only reset idle timer on actual results, not session-update markers (result: null) // Only reset idle timer on actual results, not session-update markers (result: null)
resetIdleTimer(); resetIdleTimer();
@@ -191,6 +193,12 @@ async function processGroupMessages(chatJid: string): Promise<boolean> {
if (idleTimer) clearTimeout(idleTimer); if (idleTimer) clearTimeout(idleTimer);
if (output === 'error' || hadError) { if (output === 'error' || hadError) {
// If we already sent output to the user, don't roll back the cursor —
// the user got their response and re-processing would send duplicates.
if (outputSentToUser) {
logger.warn({ group: group.name }, 'Agent error after output was sent, skipping cursor rollback to prevent duplicates');
return true;
}
// Roll back cursor so retries can re-process these messages // Roll back cursor so retries can re-process these messages
lastAgentTimestamp[chatJid] = previousCursor; lastAgentTimestamp[chatJid] = previousCursor;
saveState(); saveState();

View File

@@ -1,327 +0,0 @@
import { Api, Bot } from 'grammy';
import {
ASSISTANT_NAME,
TRIGGER_PATTERN,
} from './config.js';
import {
getAllRegisteredGroups,
storeChatMetadata,
storeMessageDirect,
} from './db.js';
import { logger } from './logger.js';
let bot: Bot | null = null;
// Bot pool for agent teams: send-only Api instances (no polling)
const poolApis: Api[] = [];
// Current display name for each pool bot (from getMe at startup)
const poolBotNames: string[] = [];
// Maps "{groupFolder}:{senderName}" → pool Api index for stable assignment
const senderBotMap = new Map<string, number>();
// Tracks which pool indices are already claimed this session
const assignedIndices = new Set<number>();
/** Store a placeholder message for non-text content (photos, voice, etc.) */
function storeNonTextMessage(ctx: any, placeholder: string): void {
const chatId = `tg:${ctx.chat.id}`;
const registeredGroups = getAllRegisteredGroups();
if (!registeredGroups[chatId]) return;
const timestamp = new Date(ctx.message.date * 1000).toISOString();
const senderName =
ctx.from?.first_name || ctx.from?.username || ctx.from?.id?.toString() || 'Unknown';
const caption = ctx.message.caption ? ` ${ctx.message.caption}` : '';
storeChatMetadata(chatId, timestamp);
storeMessageDirect({
id: ctx.message.message_id.toString(),
chat_jid: chatId,
sender: ctx.from?.id?.toString() || '',
sender_name: senderName,
content: `${placeholder}${caption}`,
timestamp,
is_from_me: false,
});
}
export async function connectTelegram(botToken: string): Promise<void> {
bot = new Bot(botToken);
// Command to get chat ID (useful for registration)
bot.command('chatid', (ctx) => {
const chatId = ctx.chat.id;
const chatType = ctx.chat.type;
const chatName =
chatType === 'private'
? ctx.from?.first_name || 'Private'
: (ctx.chat as any).title || 'Unknown';
ctx.reply(
`Chat ID: \`tg:${chatId}\`\nName: ${chatName}\nType: ${chatType}`,
{ parse_mode: 'Markdown' },
);
});
// Command to check bot status
bot.command('ping', (ctx) => {
ctx.reply(`${ASSISTANT_NAME} is online.`);
});
bot.on('message:text', async (ctx) => {
// Skip commands
if (ctx.message.text.startsWith('/')) return;
const chatId = `tg:${ctx.chat.id}`;
let content = ctx.message.text;
const timestamp = new Date(ctx.message.date * 1000).toISOString();
const senderName =
ctx.from?.first_name ||
ctx.from?.username ||
ctx.from?.id.toString() ||
'Unknown';
const sender = ctx.from?.id.toString() || '';
const msgId = ctx.message.message_id.toString();
// Determine chat name
const chatName =
ctx.chat.type === 'private'
? senderName
: (ctx.chat as any).title || chatId;
// Translate Telegram @bot_username mentions into TRIGGER_PATTERN format.
// Telegram @mentions (e.g., @andy_ai_bot) won't match TRIGGER_PATTERN
// (e.g., ^@Andy\b), so we prepend the trigger when the bot is @mentioned.
const botUsername = ctx.me?.username?.toLowerCase();
if (botUsername) {
const entities = ctx.message.entities || [];
const isBotMentioned = entities.some((entity) => {
if (entity.type === 'mention') {
const mentionText = content
.substring(entity.offset, entity.offset + entity.length)
.toLowerCase();
return mentionText === `@${botUsername}`;
}
return false;
});
if (isBotMentioned && !TRIGGER_PATTERN.test(content)) {
content = `@${ASSISTANT_NAME} ${content}`;
}
}
// Store chat metadata for discovery
storeChatMetadata(chatId, timestamp, chatName);
// Check if this chat is registered
const registeredGroups = getAllRegisteredGroups();
const group = registeredGroups[chatId];
if (!group) {
logger.debug(
{ chatId, chatName },
'Message from unregistered Telegram chat',
);
return;
}
// Store message — startMessageLoop() will pick it up
storeMessageDirect({
id: msgId,
chat_jid: chatId,
sender,
sender_name: senderName,
content,
timestamp,
is_from_me: false,
});
logger.info(
{ chatId, chatName, sender: senderName },
'Telegram message stored',
);
});
// Handle non-text messages with placeholders so the agent knows something was sent
bot.on('message:photo', (ctx) => storeNonTextMessage(ctx, '[Photo]'));
bot.on('message:video', (ctx) => storeNonTextMessage(ctx, '[Video]'));
bot.on('message:voice', (ctx) => storeNonTextMessage(ctx, '[Voice message]'));
bot.on('message:audio', (ctx) => storeNonTextMessage(ctx, '[Audio]'));
bot.on('message:document', (ctx) => {
const name = ctx.message.document?.file_name || 'file';
storeNonTextMessage(ctx, `[Document: ${name}]`);
});
bot.on('message:sticker', (ctx) => {
const emoji = ctx.message.sticker?.emoji || '';
storeNonTextMessage(ctx, `[Sticker ${emoji}]`);
});
bot.on('message:location', (ctx) => storeNonTextMessage(ctx, '[Location]'));
bot.on('message:contact', (ctx) => storeNonTextMessage(ctx, '[Contact]'));
// Handle errors gracefully
bot.catch((err) => {
logger.error({ err: err.message }, 'Telegram bot error');
});
// Start polling
bot.start({
onStart: (botInfo) => {
logger.info(
{ username: botInfo.username, id: botInfo.id },
'Telegram bot connected',
);
console.log(`\n Telegram bot: @${botInfo.username}`);
console.log(
` Send /chatid to the bot to get a chat's registration ID\n`,
);
},
});
}
export async function sendTelegramMessage(
chatId: string,
text: string,
): Promise<void> {
if (!bot) {
logger.warn('Telegram bot not initialized');
return;
}
try {
const numericId = chatId.replace(/^tg:/, '');
// Telegram has a 4096 character limit per message — split if needed
const MAX_LENGTH = 4096;
if (text.length <= MAX_LENGTH) {
await bot.api.sendMessage(numericId, text);
} else {
for (let i = 0; i < text.length; i += MAX_LENGTH) {
await bot.api.sendMessage(numericId, text.slice(i, i + MAX_LENGTH));
}
}
logger.info({ chatId, length: text.length }, 'Telegram message sent');
} catch (err) {
logger.error({ chatId, err }, 'Failed to send Telegram message');
}
}
export async function setTelegramTyping(chatId: string): Promise<void> {
if (!bot) return;
try {
const numericId = chatId.replace(/^tg:/, '');
await bot.api.sendChatAction(numericId, 'typing');
} catch (err) {
logger.debug({ chatId, err }, 'Failed to send Telegram typing indicator');
}
}
/**
* Initialize send-only Api instances for the bot pool.
* Each pool bot can send messages but doesn't poll for updates.
*/
export async function initBotPool(tokens: string[]): Promise<void> {
for (const token of tokens) {
try {
const api = new Api(token);
const me = await api.getMe();
poolApis.push(api);
poolBotNames.push(me.first_name);
logger.info(
{ username: me.username, name: me.first_name, id: me.id, poolSize: poolApis.length },
'Pool bot initialized',
);
} catch (err) {
logger.error({ err }, 'Failed to initialize pool bot');
}
}
if (poolApis.length > 0) {
logger.info({ count: poolApis.length, names: poolBotNames }, 'Telegram bot pool ready');
}
}
/**
* Send a message via a pool bot assigned to the given sender name.
* Assignment priority:
* 1. Already assigned to this sender this session → reuse
* 2. A pool bot whose current name matches the sender → claim it (no rename needed)
* 3. First unassigned pool bot → claim and rename
* 4. All claimed → wrap around (reuse + rename)
*/
export async function sendPoolMessage(
chatId: string,
text: string,
sender: string,
groupFolder: string,
): Promise<void> {
if (poolApis.length === 0) {
// No pool bots — fall back to main bot
await sendTelegramMessage(chatId, text);
return;
}
const key = `${groupFolder}:${sender}`;
let idx = senderBotMap.get(key);
if (idx === undefined) {
// 1. Check if any pool bot already has this name (from a previous session)
const nameMatch = poolBotNames.findIndex(
(name, i) => name === sender && !assignedIndices.has(i),
);
if (nameMatch !== -1) {
idx = nameMatch;
assignedIndices.add(idx);
senderBotMap.set(key, idx);
logger.info({ sender, groupFolder, poolIndex: idx }, 'Matched pool bot by name');
} else {
// 2. Pick first unassigned bot
let freeIdx = -1;
for (let i = 0; i < poolApis.length; i++) {
if (!assignedIndices.has(i)) {
freeIdx = i;
break;
}
}
// 3. All assigned — wrap around to least recently used
if (freeIdx === -1) freeIdx = assignedIndices.size % poolApis.length;
idx = freeIdx;
assignedIndices.add(idx);
senderBotMap.set(key, idx);
// Rename the bot, then wait for Telegram to propagate
try {
await poolApis[idx].setMyName(sender);
poolBotNames[idx] = sender;
await new Promise((r) => setTimeout(r, 2000));
logger.info({ sender, groupFolder, poolIndex: idx }, 'Assigned and renamed pool bot');
} catch (err) {
logger.warn({ sender, err }, 'Failed to rename pool bot (sending anyway)');
}
}
}
const api = poolApis[idx];
try {
const numericId = chatId.replace(/^tg:/, '');
const MAX_LENGTH = 4096;
if (text.length <= MAX_LENGTH) {
await api.sendMessage(numericId, text);
} else {
for (let i = 0; i < text.length; i += MAX_LENGTH) {
await api.sendMessage(numericId, text.slice(i, i + MAX_LENGTH));
}
}
logger.info({ chatId, sender, poolIndex: idx, length: text.length }, 'Pool message sent');
} catch (err) {
logger.error({ chatId, sender, err }, 'Failed to send pool message');
}
}
export function isTelegramConnected(): boolean {
return bot !== null;
}
export function stopTelegram(): void {
if (bot) {
bot.stop();
bot = null;
logger.info('Telegram bot stopped');
}
}