diff --git a/src/config.ts b/src/config.ts index 28948ca..ebf3227 100644 --- a/src/config.ts +++ b/src/config.ts @@ -31,9 +31,9 @@ export const CONTAINER_MAX_OUTPUT_SIZE = parseInt( 10, ); // 10MB default export const IPC_POLL_INTERVAL = 1000; -export const MAX_CONCURRENT_CONTAINERS = parseInt( - process.env.MAX_CONCURRENT_CONTAINERS || '3', - 10, +export const MAX_CONCURRENT_CONTAINERS = Math.max( + 1, + parseInt(process.env.MAX_CONCURRENT_CONTAINERS || '5', 10) || 5, ); function escapeRegex(str: string): string { diff --git a/src/container-runner.ts b/src/container-runner.ts index 8f36b42..224afbe 100644 --- a/src/container-runner.ts +++ b/src/container-runner.ts @@ -184,7 +184,7 @@ function buildContainerArgs(mounts: VolumeMount[], containerName: string): strin export async function runContainerAgent( group: RegisteredGroup, input: ContainerInput, - onProcess: (proc: ChildProcess) => void, + onProcess: (proc: ChildProcess, containerName: string) => void, ): Promise { const startTime = Date.now(); @@ -227,7 +227,7 @@ export async function runContainerAgent( stdio: ['pipe', 'pipe', 'pipe'], }); - onProcess(container); + onProcess(container, containerName); let stdout = ''; let stderr = ''; diff --git a/src/group-queue.ts b/src/group-queue.ts index 1c6324c..3e44468 100644 --- a/src/group-queue.ts +++ b/src/group-queue.ts @@ -1,4 +1,4 @@ -import { ChildProcess } from 'child_process'; +import { ChildProcess, exec } from 'child_process'; import { MAX_CONCURRENT_CONTAINERS } from './config.js'; import { logger } from './logger.js'; @@ -14,13 +14,14 @@ interface GroupState { pendingMessages: boolean; pendingTasks: QueuedTask[]; process: ChildProcess | null; + containerName: string | null; } export class GroupQueue { private groups = new Map(); private activeCount = 0; private waitingGroups: string[] = []; - private processMessagesFn: ((groupJid: string) => Promise) | null = + private processMessagesFn: ((groupJid: string) => Promise) | null = null; private shuttingDown = false; @@ -32,13 +33,14 @@ export class GroupQueue { pendingMessages: false, pendingTasks: [], process: null, + containerName: null, }; this.groups.set(groupJid, state); } return state; } - setProcessMessagesFn(fn: (groupJid: string) => Promise): void { + setProcessMessagesFn(fn: (groupJid: string) => Promise): void { this.processMessagesFn = fn; } @@ -101,9 +103,10 @@ export class GroupQueue { this.runTask(groupJid, { id: taskId, groupJid, fn }); } - registerProcess(groupJid: string, proc: ChildProcess): void { + registerProcess(groupJid: string, proc: ChildProcess, containerName: string): void { const state = this.getGroup(groupJid); state.process = proc; + state.containerName = containerName; } private async runForGroup( @@ -122,13 +125,27 @@ export class GroupQueue { try { if (this.processMessagesFn) { - await this.processMessagesFn(groupJid); + const success = await this.processMessagesFn(groupJid); + if (!success) { + logger.info({ groupJid }, 'Processing failed, scheduling retry'); + setTimeout(() => { + if (!this.shuttingDown) { + this.enqueueMessageCheck(groupJid); + } + }, 5000); + } } } catch (err) { logger.error({ groupJid, err }, 'Error processing messages for group'); + setTimeout(() => { + if (!this.shuttingDown) { + this.enqueueMessageCheck(groupJid); + } + }, 5000); } finally { state.active = false; state.process = null; + state.containerName = null; this.activeCount--; this.drainGroup(groupJid); } @@ -151,6 +168,7 @@ export class GroupQueue { } finally { state.active = false; state.process = null; + state.containerName = null; this.activeCount--; this.drainGroup(groupJid); } @@ -205,19 +223,24 @@ export class GroupQueue { ); // Collect all active processes - const activeProcs: Array<{ jid: string; proc: ChildProcess }> = []; + const activeProcs: Array<{ jid: string; proc: ChildProcess; containerName: string | null }> = []; for (const [jid, state] of this.groups) { if (state.process && !state.process.killed) { - activeProcs.push({ jid, proc: state.process }); + activeProcs.push({ jid, proc: state.process, containerName: state.containerName }); } } if (activeProcs.length === 0) return; - // Send SIGTERM to all - for (const { jid, proc } of activeProcs) { - logger.info({ jid, pid: proc.pid }, 'Sending SIGTERM to container'); - proc.kill('SIGTERM'); + // Stop all active containers gracefully + for (const { jid, proc, containerName } of activeProcs) { + if (containerName) { + logger.info({ jid, containerName }, 'Stopping container'); + exec(`container stop ${containerName}`); + } else { + logger.info({ jid, pid: proc.pid }, 'Sending SIGTERM to process'); + proc.kill('SIGTERM'); + } } // Wait for grace period diff --git a/src/index.ts b/src/index.ts index 4251659..1a13ec3 100644 --- a/src/index.ts +++ b/src/index.ts @@ -8,6 +8,7 @@ import makeWASocket, { makeCacheableSignalKeyStore, useMultiFileAuthState, } from '@whiskeysockets/baileys'; +import { CronExpressionParser } from 'cron-parser'; import { ASSISTANT_NAME, @@ -26,6 +27,8 @@ import { writeTasksSnapshot, } from './container-runner.js'; import { + createTask, + deleteTask, getAllChats, getAllRegisteredGroups, getAllSessions, @@ -43,6 +46,7 @@ import { storeChatMetadata, storeMessage, updateChatName, + updateTask, } from './db.js'; import { GroupQueue } from './group-queue.js'; import { startSchedulerLoop } from './task-scheduler.js'; @@ -92,7 +96,12 @@ function loadState(): void { // Load from SQLite (migration from JSON happens in initDatabase) lastTimestamp = getRouterState('last_timestamp') || ''; const agentTs = getRouterState('last_agent_timestamp'); - lastAgentTimestamp = agentTs ? JSON.parse(agentTs) : {}; + try { + lastAgentTimestamp = agentTs ? JSON.parse(agentTs) : {}; + } catch { + logger.warn('Corrupted last_agent_timestamp in DB, resetting'); + lastAgentTimestamp = {}; + } sessions = getAllSessions(); registeredGroups = getAllRegisteredGroups(); logger.info( @@ -183,9 +192,9 @@ function getAvailableGroups(): AvailableGroup[] { * Process all pending messages for a group. * Called by the GroupQueue when it's this group's turn. */ -async function processGroupMessages(chatJid: string): Promise { +async function processGroupMessages(chatJid: string): Promise { const group = registeredGroups[chatJid]; - if (!group) return; + if (!group) return true; const isMainGroup = group.folder === MAIN_GROUP_FOLDER; @@ -197,14 +206,14 @@ async function processGroupMessages(chatJid: string): Promise { ASSISTANT_NAME, ); - if (missedMessages.length === 0) return; + if (missedMessages.length === 0) return true; // For non-main groups, check if any message has the trigger if (!isMainGroup) { const hasTrigger = missedMessages.some((m) => TRIGGER_PATTERN.test(m.content.trim()), ); - if (!hasTrigger) return; + if (!hasTrigger) return true; } const lines = missedMessages.map((m) => { @@ -233,7 +242,9 @@ async function processGroupMessages(chatJid: string): Promise { missedMessages[missedMessages.length - 1].timestamp; saveState(); await sendMessage(chatJid, `${ASSISTANT_NAME}: ${response}`); + return true; } + return false; } async function runAgent( @@ -279,7 +290,7 @@ async function runAgent( chatJid, isMain, }, - (proc) => queue.registerProcess(chatJid, proc), + (proc, containerName) => queue.registerProcess(chatJid, proc, containerName), ); if (output.newSessionId) { @@ -453,15 +464,6 @@ async function processTaskIpc( sourceGroup: string, // Verified identity from IPC directory isMain: boolean, // Verified from directory path ): Promise { - // Import db functions dynamically to avoid circular deps - const { - createTask, - updateTask, - deleteTask, - getTaskById: getTask, - } = await import('./db.js'); - const { CronExpressionParser } = await import('cron-parser'); - switch (data.type) { case 'schedule_task': if ( @@ -557,7 +559,7 @@ async function processTaskIpc( case 'pause_task': if (data.taskId) { - const task = getTask(data.taskId); + const task = getTaskById(data.taskId); if (task && (isMain || task.group_folder === sourceGroup)) { updateTask(data.taskId, { status: 'paused' }); logger.info( @@ -575,7 +577,7 @@ async function processTaskIpc( case 'resume_task': if (data.taskId) { - const task = getTask(data.taskId); + const task = getTaskById(data.taskId); if (task && (isMain || task.group_folder === sourceGroup)) { updateTask(data.taskId, { status: 'active' }); logger.info( @@ -593,7 +595,7 @@ async function processTaskIpc( case 'cancel_task': if (data.taskId) { - const task = getTask(data.taskId); + const task = getTaskById(data.taskId); if (task && (isMain || task.group_folder === sourceGroup)) { deleteTask(data.taskId); logger.info( @@ -619,9 +621,7 @@ async function processTaskIpc( await syncGroupMetadata(true); // Write updated snapshot immediately const availableGroups = getAvailableGroups(); - const { writeGroupsSnapshot: writeGroups } = - await import('./container-runner.js'); - writeGroups( + writeGroupsSnapshot( sourceGroup, true, availableGroups, @@ -737,10 +737,11 @@ async function connectWhatsApp(): Promise { registeredGroups: () => registeredGroups, getSessions: () => sessions, queue, - onProcess: (groupJid, proc) => queue.registerProcess(groupJid, proc), + onProcess: (groupJid, proc, containerName) => queue.registerProcess(groupJid, proc, containerName), }); startIpcWatcher(); startMessageLoop(); + recoverPendingMessages(); } }); @@ -825,8 +826,6 @@ async function startMessageLoop(): Promise { * Handles crash between advancing lastTimestamp and processing messages. */ function recoverPendingMessages(): void { - queue.setProcessMessagesFn(processGroupMessages); - for (const [chatJid, group] of Object.entries(registeredGroups)) { const sinceTimestamp = lastAgentTimestamp[chatJid] || ''; const pending = getMessagesSince(chatJid, sinceTimestamp, ASSISTANT_NAME); @@ -903,7 +902,6 @@ async function main(): Promise { initDatabase(); logger.info('Database initialized'); loadState(); - recoverPendingMessages(); // Graceful shutdown handlers const shutdown = async (signal: string) => { diff --git a/src/task-scheduler.ts b/src/task-scheduler.ts index 6b517ae..3508371 100644 --- a/src/task-scheduler.ts +++ b/src/task-scheduler.ts @@ -26,7 +26,7 @@ export interface SchedulerDependencies { registeredGroups: () => Record; getSessions: () => Record; queue: GroupQueue; - onProcess: (groupJid: string, proc: ChildProcess) => void; + onProcess: (groupJid: string, proc: ChildProcess, containerName: string) => void; } async function runTask( @@ -98,7 +98,7 @@ async function runTask( chatJid: task.chat_jid, isMain, }, - (proc) => deps.onProcess(task.chat_jid, proc), + (proc, containerName) => deps.onProcess(task.chat_jid, proc, containerName), ); if (output.status === 'error') {