fix: address review feedback for per-group queue reliability

- Fix startup recovery running before WhatsApp connects, which could
  permanently lose agent responses by advancing lastAgentTimestamp
  before sock is initialized
- Add 5s retry on container failure so messages aren't silently dropped
  until a new message arrives for the group
- Use `container stop` in shutdown instead of raw SIGTERM to CLI wrapper,
  ensuring proper container cleanup
- Replace unnecessary dynamic imports with static imports in processTaskIpc
- Guard JSON.parse of DB-stored last_agent_timestamp against corruption
- Validate MAX_CONCURRENT_CONTAINERS (default 5, min 1, NaN-safe)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
gavrielc
2026-02-06 16:45:00 +02:00
parent eac9a6acfd
commit 03df69e9b5
5 changed files with 64 additions and 43 deletions

View File

@@ -31,9 +31,9 @@ export const CONTAINER_MAX_OUTPUT_SIZE = parseInt(
10, 10,
); // 10MB default ); // 10MB default
export const IPC_POLL_INTERVAL = 1000; export const IPC_POLL_INTERVAL = 1000;
export const MAX_CONCURRENT_CONTAINERS = parseInt( export const MAX_CONCURRENT_CONTAINERS = Math.max(
process.env.MAX_CONCURRENT_CONTAINERS || '3', 1,
10, parseInt(process.env.MAX_CONCURRENT_CONTAINERS || '5', 10) || 5,
); );
function escapeRegex(str: string): string { function escapeRegex(str: string): string {

View File

@@ -184,7 +184,7 @@ function buildContainerArgs(mounts: VolumeMount[], containerName: string): strin
export async function runContainerAgent( export async function runContainerAgent(
group: RegisteredGroup, group: RegisteredGroup,
input: ContainerInput, input: ContainerInput,
onProcess: (proc: ChildProcess) => void, onProcess: (proc: ChildProcess, containerName: string) => void,
): Promise<ContainerOutput> { ): Promise<ContainerOutput> {
const startTime = Date.now(); const startTime = Date.now();
@@ -227,7 +227,7 @@ export async function runContainerAgent(
stdio: ['pipe', 'pipe', 'pipe'], stdio: ['pipe', 'pipe', 'pipe'],
}); });
onProcess(container); onProcess(container, containerName);
let stdout = ''; let stdout = '';
let stderr = ''; let stderr = '';

View File

@@ -1,4 +1,4 @@
import { ChildProcess } from 'child_process'; import { ChildProcess, exec } from 'child_process';
import { MAX_CONCURRENT_CONTAINERS } from './config.js'; import { MAX_CONCURRENT_CONTAINERS } from './config.js';
import { logger } from './logger.js'; import { logger } from './logger.js';
@@ -14,13 +14,14 @@ interface GroupState {
pendingMessages: boolean; pendingMessages: boolean;
pendingTasks: QueuedTask[]; pendingTasks: QueuedTask[];
process: ChildProcess | null; process: ChildProcess | null;
containerName: string | null;
} }
export class GroupQueue { export class GroupQueue {
private groups = new Map<string, GroupState>(); private groups = new Map<string, GroupState>();
private activeCount = 0; private activeCount = 0;
private waitingGroups: string[] = []; private waitingGroups: string[] = [];
private processMessagesFn: ((groupJid: string) => Promise<void>) | null = private processMessagesFn: ((groupJid: string) => Promise<boolean>) | null =
null; null;
private shuttingDown = false; private shuttingDown = false;
@@ -32,13 +33,14 @@ export class GroupQueue {
pendingMessages: false, pendingMessages: false,
pendingTasks: [], pendingTasks: [],
process: null, process: null,
containerName: null,
}; };
this.groups.set(groupJid, state); this.groups.set(groupJid, state);
} }
return state; return state;
} }
setProcessMessagesFn(fn: (groupJid: string) => Promise<void>): void { setProcessMessagesFn(fn: (groupJid: string) => Promise<boolean>): void {
this.processMessagesFn = fn; this.processMessagesFn = fn;
} }
@@ -101,9 +103,10 @@ export class GroupQueue {
this.runTask(groupJid, { id: taskId, groupJid, fn }); this.runTask(groupJid, { id: taskId, groupJid, fn });
} }
registerProcess(groupJid: string, proc: ChildProcess): void { registerProcess(groupJid: string, proc: ChildProcess, containerName: string): void {
const state = this.getGroup(groupJid); const state = this.getGroup(groupJid);
state.process = proc; state.process = proc;
state.containerName = containerName;
} }
private async runForGroup( private async runForGroup(
@@ -122,13 +125,27 @@ export class GroupQueue {
try { try {
if (this.processMessagesFn) { if (this.processMessagesFn) {
await this.processMessagesFn(groupJid); const success = await this.processMessagesFn(groupJid);
if (!success) {
logger.info({ groupJid }, 'Processing failed, scheduling retry');
setTimeout(() => {
if (!this.shuttingDown) {
this.enqueueMessageCheck(groupJid);
}
}, 5000);
}
} }
} catch (err) { } catch (err) {
logger.error({ groupJid, err }, 'Error processing messages for group'); logger.error({ groupJid, err }, 'Error processing messages for group');
setTimeout(() => {
if (!this.shuttingDown) {
this.enqueueMessageCheck(groupJid);
}
}, 5000);
} finally { } finally {
state.active = false; state.active = false;
state.process = null; state.process = null;
state.containerName = null;
this.activeCount--; this.activeCount--;
this.drainGroup(groupJid); this.drainGroup(groupJid);
} }
@@ -151,6 +168,7 @@ export class GroupQueue {
} finally { } finally {
state.active = false; state.active = false;
state.process = null; state.process = null;
state.containerName = null;
this.activeCount--; this.activeCount--;
this.drainGroup(groupJid); this.drainGroup(groupJid);
} }
@@ -205,20 +223,25 @@ export class GroupQueue {
); );
// Collect all active processes // Collect all active processes
const activeProcs: Array<{ jid: string; proc: ChildProcess }> = []; const activeProcs: Array<{ jid: string; proc: ChildProcess; containerName: string | null }> = [];
for (const [jid, state] of this.groups) { for (const [jid, state] of this.groups) {
if (state.process && !state.process.killed) { if (state.process && !state.process.killed) {
activeProcs.push({ jid, proc: state.process }); activeProcs.push({ jid, proc: state.process, containerName: state.containerName });
} }
} }
if (activeProcs.length === 0) return; if (activeProcs.length === 0) return;
// Send SIGTERM to all // Stop all active containers gracefully
for (const { jid, proc } of activeProcs) { for (const { jid, proc, containerName } of activeProcs) {
logger.info({ jid, pid: proc.pid }, 'Sending SIGTERM to container'); if (containerName) {
logger.info({ jid, containerName }, 'Stopping container');
exec(`container stop ${containerName}`);
} else {
logger.info({ jid, pid: proc.pid }, 'Sending SIGTERM to process');
proc.kill('SIGTERM'); proc.kill('SIGTERM');
} }
}
// Wait for grace period // Wait for grace period
await new Promise<void>((resolve) => { await new Promise<void>((resolve) => {

View File

@@ -8,6 +8,7 @@ import makeWASocket, {
makeCacheableSignalKeyStore, makeCacheableSignalKeyStore,
useMultiFileAuthState, useMultiFileAuthState,
} from '@whiskeysockets/baileys'; } from '@whiskeysockets/baileys';
import { CronExpressionParser } from 'cron-parser';
import { import {
ASSISTANT_NAME, ASSISTANT_NAME,
@@ -26,6 +27,8 @@ import {
writeTasksSnapshot, writeTasksSnapshot,
} from './container-runner.js'; } from './container-runner.js';
import { import {
createTask,
deleteTask,
getAllChats, getAllChats,
getAllRegisteredGroups, getAllRegisteredGroups,
getAllSessions, getAllSessions,
@@ -43,6 +46,7 @@ import {
storeChatMetadata, storeChatMetadata,
storeMessage, storeMessage,
updateChatName, updateChatName,
updateTask,
} from './db.js'; } from './db.js';
import { GroupQueue } from './group-queue.js'; import { GroupQueue } from './group-queue.js';
import { startSchedulerLoop } from './task-scheduler.js'; import { startSchedulerLoop } from './task-scheduler.js';
@@ -92,7 +96,12 @@ function loadState(): void {
// Load from SQLite (migration from JSON happens in initDatabase) // Load from SQLite (migration from JSON happens in initDatabase)
lastTimestamp = getRouterState('last_timestamp') || ''; lastTimestamp = getRouterState('last_timestamp') || '';
const agentTs = getRouterState('last_agent_timestamp'); const agentTs = getRouterState('last_agent_timestamp');
try {
lastAgentTimestamp = agentTs ? JSON.parse(agentTs) : {}; lastAgentTimestamp = agentTs ? JSON.parse(agentTs) : {};
} catch {
logger.warn('Corrupted last_agent_timestamp in DB, resetting');
lastAgentTimestamp = {};
}
sessions = getAllSessions(); sessions = getAllSessions();
registeredGroups = getAllRegisteredGroups(); registeredGroups = getAllRegisteredGroups();
logger.info( logger.info(
@@ -183,9 +192,9 @@ function getAvailableGroups(): AvailableGroup[] {
* Process all pending messages for a group. * Process all pending messages for a group.
* Called by the GroupQueue when it's this group's turn. * Called by the GroupQueue when it's this group's turn.
*/ */
async function processGroupMessages(chatJid: string): Promise<void> { async function processGroupMessages(chatJid: string): Promise<boolean> {
const group = registeredGroups[chatJid]; const group = registeredGroups[chatJid];
if (!group) return; if (!group) return true;
const isMainGroup = group.folder === MAIN_GROUP_FOLDER; const isMainGroup = group.folder === MAIN_GROUP_FOLDER;
@@ -197,14 +206,14 @@ async function processGroupMessages(chatJid: string): Promise<void> {
ASSISTANT_NAME, ASSISTANT_NAME,
); );
if (missedMessages.length === 0) return; if (missedMessages.length === 0) return true;
// For non-main groups, check if any message has the trigger // For non-main groups, check if any message has the trigger
if (!isMainGroup) { if (!isMainGroup) {
const hasTrigger = missedMessages.some((m) => const hasTrigger = missedMessages.some((m) =>
TRIGGER_PATTERN.test(m.content.trim()), TRIGGER_PATTERN.test(m.content.trim()),
); );
if (!hasTrigger) return; if (!hasTrigger) return true;
} }
const lines = missedMessages.map((m) => { const lines = missedMessages.map((m) => {
@@ -233,7 +242,9 @@ async function processGroupMessages(chatJid: string): Promise<void> {
missedMessages[missedMessages.length - 1].timestamp; missedMessages[missedMessages.length - 1].timestamp;
saveState(); saveState();
await sendMessage(chatJid, `${ASSISTANT_NAME}: ${response}`); await sendMessage(chatJid, `${ASSISTANT_NAME}: ${response}`);
return true;
} }
return false;
} }
async function runAgent( async function runAgent(
@@ -279,7 +290,7 @@ async function runAgent(
chatJid, chatJid,
isMain, isMain,
}, },
(proc) => queue.registerProcess(chatJid, proc), (proc, containerName) => queue.registerProcess(chatJid, proc, containerName),
); );
if (output.newSessionId) { if (output.newSessionId) {
@@ -453,15 +464,6 @@ async function processTaskIpc(
sourceGroup: string, // Verified identity from IPC directory sourceGroup: string, // Verified identity from IPC directory
isMain: boolean, // Verified from directory path isMain: boolean, // Verified from directory path
): Promise<void> { ): Promise<void> {
// Import db functions dynamically to avoid circular deps
const {
createTask,
updateTask,
deleteTask,
getTaskById: getTask,
} = await import('./db.js');
const { CronExpressionParser } = await import('cron-parser');
switch (data.type) { switch (data.type) {
case 'schedule_task': case 'schedule_task':
if ( if (
@@ -557,7 +559,7 @@ async function processTaskIpc(
case 'pause_task': case 'pause_task':
if (data.taskId) { if (data.taskId) {
const task = getTask(data.taskId); const task = getTaskById(data.taskId);
if (task && (isMain || task.group_folder === sourceGroup)) { if (task && (isMain || task.group_folder === sourceGroup)) {
updateTask(data.taskId, { status: 'paused' }); updateTask(data.taskId, { status: 'paused' });
logger.info( logger.info(
@@ -575,7 +577,7 @@ async function processTaskIpc(
case 'resume_task': case 'resume_task':
if (data.taskId) { if (data.taskId) {
const task = getTask(data.taskId); const task = getTaskById(data.taskId);
if (task && (isMain || task.group_folder === sourceGroup)) { if (task && (isMain || task.group_folder === sourceGroup)) {
updateTask(data.taskId, { status: 'active' }); updateTask(data.taskId, { status: 'active' });
logger.info( logger.info(
@@ -593,7 +595,7 @@ async function processTaskIpc(
case 'cancel_task': case 'cancel_task':
if (data.taskId) { if (data.taskId) {
const task = getTask(data.taskId); const task = getTaskById(data.taskId);
if (task && (isMain || task.group_folder === sourceGroup)) { if (task && (isMain || task.group_folder === sourceGroup)) {
deleteTask(data.taskId); deleteTask(data.taskId);
logger.info( logger.info(
@@ -619,9 +621,7 @@ async function processTaskIpc(
await syncGroupMetadata(true); await syncGroupMetadata(true);
// Write updated snapshot immediately // Write updated snapshot immediately
const availableGroups = getAvailableGroups(); const availableGroups = getAvailableGroups();
const { writeGroupsSnapshot: writeGroups } = writeGroupsSnapshot(
await import('./container-runner.js');
writeGroups(
sourceGroup, sourceGroup,
true, true,
availableGroups, availableGroups,
@@ -737,10 +737,11 @@ async function connectWhatsApp(): Promise<void> {
registeredGroups: () => registeredGroups, registeredGroups: () => registeredGroups,
getSessions: () => sessions, getSessions: () => sessions,
queue, queue,
onProcess: (groupJid, proc) => queue.registerProcess(groupJid, proc), onProcess: (groupJid, proc, containerName) => queue.registerProcess(groupJid, proc, containerName),
}); });
startIpcWatcher(); startIpcWatcher();
startMessageLoop(); startMessageLoop();
recoverPendingMessages();
} }
}); });
@@ -825,8 +826,6 @@ async function startMessageLoop(): Promise<void> {
* Handles crash between advancing lastTimestamp and processing messages. * Handles crash between advancing lastTimestamp and processing messages.
*/ */
function recoverPendingMessages(): void { function recoverPendingMessages(): void {
queue.setProcessMessagesFn(processGroupMessages);
for (const [chatJid, group] of Object.entries(registeredGroups)) { for (const [chatJid, group] of Object.entries(registeredGroups)) {
const sinceTimestamp = lastAgentTimestamp[chatJid] || ''; const sinceTimestamp = lastAgentTimestamp[chatJid] || '';
const pending = getMessagesSince(chatJid, sinceTimestamp, ASSISTANT_NAME); const pending = getMessagesSince(chatJid, sinceTimestamp, ASSISTANT_NAME);
@@ -903,7 +902,6 @@ async function main(): Promise<void> {
initDatabase(); initDatabase();
logger.info('Database initialized'); logger.info('Database initialized');
loadState(); loadState();
recoverPendingMessages();
// Graceful shutdown handlers // Graceful shutdown handlers
const shutdown = async (signal: string) => { const shutdown = async (signal: string) => {

View File

@@ -26,7 +26,7 @@ export interface SchedulerDependencies {
registeredGroups: () => Record<string, RegisteredGroup>; registeredGroups: () => Record<string, RegisteredGroup>;
getSessions: () => Record<string, string>; getSessions: () => Record<string, string>;
queue: GroupQueue; queue: GroupQueue;
onProcess: (groupJid: string, proc: ChildProcess) => void; onProcess: (groupJid: string, proc: ChildProcess, containerName: string) => void;
} }
async function runTask( async function runTask(
@@ -98,7 +98,7 @@ async function runTask(
chatJid: task.chat_jid, chatJid: task.chat_jid,
isMain, isMain,
}, },
(proc) => deps.onProcess(task.chat_jid, proc), (proc, containerName) => deps.onProcess(task.chat_jid, proc, containerName),
); );
if (output.status === 'error') { if (output.status === 'error') {