diff --git a/src/config.ts b/src/config.ts index 3525c76..28948ca 100644 --- a/src/config.ts +++ b/src/config.ts @@ -31,6 +31,10 @@ export const CONTAINER_MAX_OUTPUT_SIZE = parseInt( 10, ); // 10MB default export const IPC_POLL_INTERVAL = 1000; +export const MAX_CONCURRENT_CONTAINERS = parseInt( + process.env.MAX_CONCURRENT_CONTAINERS || '3', + 10, +); function escapeRegex(str: string): string { return str.replace(/[.*+?^${}()|[\]\\]/g, '\\$&'); diff --git a/src/container-runner.ts b/src/container-runner.ts index 7c45446..8f36b42 100644 --- a/src/container-runner.ts +++ b/src/container-runner.ts @@ -2,7 +2,7 @@ * Container Runner for NanoClaw * Spawns agent execution in Apple Container and handles IPC */ -import { exec, spawn } from 'child_process'; +import { ChildProcess, exec, spawn } from 'child_process'; import fs from 'fs'; import os from 'os'; import path from 'path'; @@ -38,7 +38,6 @@ export interface ContainerInput { groupFolder: string; chatJid: string; isMain: boolean; - isScheduledTask?: boolean; } export interface ContainerOutput { @@ -51,7 +50,7 @@ export interface ContainerOutput { interface VolumeMount { hostPath: string; containerPath: string; - readonly?: boolean; + readonly: boolean; } function buildVolumeMounts( @@ -185,6 +184,7 @@ function buildContainerArgs(mounts: VolumeMount[], containerName: string): strin export async function runContainerAgent( group: RegisteredGroup, input: ContainerInput, + onProcess: (proc: ChildProcess) => void, ): Promise { const startTime = Date.now(); @@ -227,6 +227,8 @@ export async function runContainerAgent( stdio: ['pipe', 'pipe', 'pipe'], }); + onProcess(container); + let stdout = ''; let stderr = ''; let stdoutTruncated = false; diff --git a/src/db.ts b/src/db.ts index 2b2fa77..a5bd049 100644 --- a/src/db.ts +++ b/src/db.ts @@ -4,8 +4,8 @@ import path from 'path'; import { proto } from '@whiskeysockets/baileys'; -import { STORE_DIR } from './config.js'; -import { NewMessage, ScheduledTask, TaskRunLog } from './types.js'; +import { DATA_DIR, STORE_DIR } from './config.js'; +import { NewMessage, RegisteredGroup, ScheduledTask, TaskRunLog } from './types.js'; let db: Database.Database; @@ -77,6 +77,29 @@ export function initDatabase(): void { } catch { /* column already exists */ } + + // State tables (replacing JSON files) + db.exec(` + CREATE TABLE IF NOT EXISTS router_state ( + key TEXT PRIMARY KEY, + value TEXT NOT NULL + ); + CREATE TABLE IF NOT EXISTS sessions ( + group_folder TEXT PRIMARY KEY, + session_id TEXT NOT NULL + ); + CREATE TABLE IF NOT EXISTS registered_groups ( + jid TEXT PRIMARY KEY, + name TEXT NOT NULL, + folder TEXT NOT NULL UNIQUE, + trigger_pattern TEXT NOT NULL, + added_at TEXT NOT NULL, + container_config TEXT + ); + `); + + // Migrate from JSON files if they exist + migrateJsonState(); } /** @@ -381,16 +404,171 @@ export function logTaskRun(log: TaskRunLog): void { ); } -export function getTaskRunLogs(taskId: string, limit = 10): TaskRunLog[] { - return db - .prepare( - ` - SELECT task_id, run_at, duration_ms, status, result, error - FROM task_run_logs - WHERE task_id = ? - ORDER BY run_at DESC - LIMIT ? - `, - ) - .all(taskId, limit) as TaskRunLog[]; +// --- Router state accessors --- + +export function getRouterState(key: string): string | undefined { + const row = db + .prepare('SELECT value FROM router_state WHERE key = ?') + .get(key) as { value: string } | undefined; + return row?.value; +} + +export function setRouterState(key: string, value: string): void { + db.prepare( + 'INSERT OR REPLACE INTO router_state (key, value) VALUES (?, ?)', + ).run(key, value); +} + +// --- Session accessors --- + +export function getSession(groupFolder: string): string | undefined { + const row = db + .prepare('SELECT session_id FROM sessions WHERE group_folder = ?') + .get(groupFolder) as { session_id: string } | undefined; + return row?.session_id; +} + +export function setSession(groupFolder: string, sessionId: string): void { + db.prepare( + 'INSERT OR REPLACE INTO sessions (group_folder, session_id) VALUES (?, ?)', + ).run(groupFolder, sessionId); +} + +export function getAllSessions(): Record { + const rows = db + .prepare('SELECT group_folder, session_id FROM sessions') + .all() as Array<{ group_folder: string; session_id: string }>; + const result: Record = {}; + for (const row of rows) { + result[row.group_folder] = row.session_id; + } + return result; +} + +// --- Registered group accessors --- + +export function getRegisteredGroup( + jid: string, +): (RegisteredGroup & { jid: string }) | undefined { + const row = db + .prepare('SELECT * FROM registered_groups WHERE jid = ?') + .get(jid) as + | { + jid: string; + name: string; + folder: string; + trigger_pattern: string; + added_at: string; + container_config: string | null; + } + | undefined; + if (!row) return undefined; + return { + jid: row.jid, + name: row.name, + folder: row.folder, + trigger: row.trigger_pattern, + added_at: row.added_at, + containerConfig: row.container_config + ? JSON.parse(row.container_config) + : undefined, + }; +} + +export function setRegisteredGroup( + jid: string, + group: RegisteredGroup, +): void { + db.prepare( + `INSERT OR REPLACE INTO registered_groups (jid, name, folder, trigger_pattern, added_at, container_config) + VALUES (?, ?, ?, ?, ?, ?)`, + ).run( + jid, + group.name, + group.folder, + group.trigger, + group.added_at, + group.containerConfig ? JSON.stringify(group.containerConfig) : null, + ); +} + +export function getAllRegisteredGroups(): Record { + const rows = db + .prepare('SELECT * FROM registered_groups') + .all() as Array<{ + jid: string; + name: string; + folder: string; + trigger_pattern: string; + added_at: string; + container_config: string | null; + }>; + const result: Record = {}; + for (const row of rows) { + result[row.jid] = { + name: row.name, + folder: row.folder, + trigger: row.trigger_pattern, + added_at: row.added_at, + containerConfig: row.container_config + ? JSON.parse(row.container_config) + : undefined, + }; + } + return result; +} + +// --- JSON migration --- + +function migrateJsonState(): void { + const migrateFile = (filename: string) => { + const filePath = path.join(DATA_DIR, filename); + if (!fs.existsSync(filePath)) return null; + try { + const data = JSON.parse(fs.readFileSync(filePath, 'utf-8')); + fs.renameSync(filePath, `${filePath}.migrated`); + return data; + } catch { + return null; + } + }; + + // Migrate router_state.json + const routerState = migrateFile('router_state.json') as { + last_timestamp?: string; + last_agent_timestamp?: Record; + } | null; + if (routerState) { + if (routerState.last_timestamp) { + setRouterState('last_timestamp', routerState.last_timestamp); + } + if (routerState.last_agent_timestamp) { + setRouterState( + 'last_agent_timestamp', + JSON.stringify(routerState.last_agent_timestamp), + ); + } + } + + // Migrate sessions.json + const sessions = migrateFile('sessions.json') as Record< + string, + string + > | null; + if (sessions) { + for (const [folder, sessionId] of Object.entries(sessions)) { + setSession(folder, sessionId); + } + } + + // Migrate registered_groups.json + const groups = migrateFile('registered_groups.json') as Record< + string, + RegisteredGroup + > | null; + if (groups) { + for (const [jid, group] of Object.entries(groups)) { + setRegisteredGroup(jid, group); + } + } } diff --git a/src/group-queue.ts b/src/group-queue.ts new file mode 100644 index 0000000..1c6324c --- /dev/null +++ b/src/group-queue.ts @@ -0,0 +1,248 @@ +import { ChildProcess } from 'child_process'; + +import { MAX_CONCURRENT_CONTAINERS } from './config.js'; +import { logger } from './logger.js'; + +interface QueuedTask { + id: string; + groupJid: string; + fn: () => Promise; +} + +interface GroupState { + active: boolean; + pendingMessages: boolean; + pendingTasks: QueuedTask[]; + process: ChildProcess | null; +} + +export class GroupQueue { + private groups = new Map(); + private activeCount = 0; + private waitingGroups: string[] = []; + private processMessagesFn: ((groupJid: string) => Promise) | null = + null; + private shuttingDown = false; + + private getGroup(groupJid: string): GroupState { + let state = this.groups.get(groupJid); + if (!state) { + state = { + active: false, + pendingMessages: false, + pendingTasks: [], + process: null, + }; + this.groups.set(groupJid, state); + } + return state; + } + + setProcessMessagesFn(fn: (groupJid: string) => Promise): void { + this.processMessagesFn = fn; + } + + enqueueMessageCheck(groupJid: string): void { + if (this.shuttingDown) return; + + const state = this.getGroup(groupJid); + + if (state.active) { + state.pendingMessages = true; + logger.debug({ groupJid }, 'Container active, message queued'); + return; + } + + if (this.activeCount >= MAX_CONCURRENT_CONTAINERS) { + state.pendingMessages = true; + if (!this.waitingGroups.includes(groupJid)) { + this.waitingGroups.push(groupJid); + } + logger.debug( + { groupJid, activeCount: this.activeCount }, + 'At concurrency limit, message queued', + ); + return; + } + + this.runForGroup(groupJid, 'messages'); + } + + enqueueTask(groupJid: string, taskId: string, fn: () => Promise): void { + if (this.shuttingDown) return; + + const state = this.getGroup(groupJid); + + // Prevent double-queuing of the same task + if (state.pendingTasks.some((t) => t.id === taskId)) { + logger.debug({ groupJid, taskId }, 'Task already queued, skipping'); + return; + } + + if (state.active) { + state.pendingTasks.push({ id: taskId, groupJid, fn }); + logger.debug({ groupJid, taskId }, 'Container active, task queued'); + return; + } + + if (this.activeCount >= MAX_CONCURRENT_CONTAINERS) { + state.pendingTasks.push({ id: taskId, groupJid, fn }); + if (!this.waitingGroups.includes(groupJid)) { + this.waitingGroups.push(groupJid); + } + logger.debug( + { groupJid, taskId, activeCount: this.activeCount }, + 'At concurrency limit, task queued', + ); + return; + } + + // Run immediately + this.runTask(groupJid, { id: taskId, groupJid, fn }); + } + + registerProcess(groupJid: string, proc: ChildProcess): void { + const state = this.getGroup(groupJid); + state.process = proc; + } + + private async runForGroup( + groupJid: string, + reason: 'messages' | 'drain', + ): Promise { + const state = this.getGroup(groupJid); + state.active = true; + state.pendingMessages = false; + this.activeCount++; + + logger.debug( + { groupJid, reason, activeCount: this.activeCount }, + 'Starting container for group', + ); + + try { + if (this.processMessagesFn) { + await this.processMessagesFn(groupJid); + } + } catch (err) { + logger.error({ groupJid, err }, 'Error processing messages for group'); + } finally { + state.active = false; + state.process = null; + this.activeCount--; + this.drainGroup(groupJid); + } + } + + private async runTask(groupJid: string, task: QueuedTask): Promise { + const state = this.getGroup(groupJid); + state.active = true; + this.activeCount++; + + logger.debug( + { groupJid, taskId: task.id, activeCount: this.activeCount }, + 'Running queued task', + ); + + try { + await task.fn(); + } catch (err) { + logger.error({ groupJid, taskId: task.id, err }, 'Error running task'); + } finally { + state.active = false; + state.process = null; + this.activeCount--; + this.drainGroup(groupJid); + } + } + + private drainGroup(groupJid: string): void { + if (this.shuttingDown) return; + + const state = this.getGroup(groupJid); + + // Tasks first (they won't be re-discovered from SQLite like messages) + if (state.pendingTasks.length > 0) { + const task = state.pendingTasks.shift()!; + this.runTask(groupJid, task); + return; + } + + // Then pending messages + if (state.pendingMessages) { + this.runForGroup(groupJid, 'drain'); + return; + } + + // Nothing pending for this group; check if other groups are waiting for a slot + this.drainWaiting(); + } + + private drainWaiting(): void { + while ( + this.waitingGroups.length > 0 && + this.activeCount < MAX_CONCURRENT_CONTAINERS + ) { + const nextJid = this.waitingGroups.shift()!; + const state = this.getGroup(nextJid); + + // Prioritize tasks over messages + if (state.pendingTasks.length > 0) { + const task = state.pendingTasks.shift()!; + this.runTask(nextJid, task); + } else if (state.pendingMessages) { + this.runForGroup(nextJid, 'drain'); + } + // If neither pending, skip this group + } + } + + async shutdown(gracePeriodMs: number): Promise { + this.shuttingDown = true; + logger.info( + { activeCount: this.activeCount, gracePeriodMs }, + 'GroupQueue shutting down', + ); + + // Collect all active processes + const activeProcs: Array<{ jid: string; proc: ChildProcess }> = []; + for (const [jid, state] of this.groups) { + if (state.process && !state.process.killed) { + activeProcs.push({ jid, proc: state.process }); + } + } + + if (activeProcs.length === 0) return; + + // Send SIGTERM to all + for (const { jid, proc } of activeProcs) { + logger.info({ jid, pid: proc.pid }, 'Sending SIGTERM to container'); + proc.kill('SIGTERM'); + } + + // Wait for grace period + await new Promise((resolve) => { + const checkInterval = setInterval(() => { + const alive = activeProcs.filter( + ({ proc }) => !proc.killed && proc.exitCode === null, + ); + if (alive.length === 0) { + clearInterval(checkInterval); + resolve(); + } + }, 500); + + setTimeout(() => { + clearInterval(checkInterval); + // SIGKILL survivors + for (const { jid, proc } of activeProcs) { + if (!proc.killed && proc.exitCode === null) { + logger.warn({ jid, pid: proc.pid }, 'Sending SIGKILL to container'); + proc.kill('SIGKILL'); + } + } + resolve(); + }, gracePeriodMs); + }); + } +} diff --git a/src/index.ts b/src/index.ts index e137e58..4251659 100644 --- a/src/index.ts +++ b/src/index.ts @@ -27,27 +27,33 @@ import { } from './container-runner.js'; import { getAllChats, + getAllRegisteredGroups, + getAllSessions, getAllTasks, getLastGroupSync, getMessagesSince, getNewMessages, + getRouterState, getTaskById, initDatabase, setLastGroupSync, + setRegisteredGroup, + setRouterState, + setSession, storeChatMetadata, storeMessage, updateChatName, } from './db.js'; +import { GroupQueue } from './group-queue.js'; import { startSchedulerLoop } from './task-scheduler.js'; -import { NewMessage, RegisteredGroup, Session } from './types.js'; -import { loadJson, saveJson } from './utils.js'; +import { RegisteredGroup } from './types.js'; import { logger } from './logger.js'; const GROUP_SYNC_INTERVAL_MS = 24 * 60 * 60 * 1000; // 24 hours let sock: WASocket; let lastTimestamp = ''; -let sessions: Session = {}; +let sessions: Record = {}; let registeredGroups: Record = {}; let lastAgentTimestamp: Record = {}; // LID to phone number mapping (WhatsApp now sends LID JIDs for self-chats) @@ -57,6 +63,8 @@ let messageLoopRunning = false; let ipcWatcherRunning = false; let groupSyncTimerStarted = false; +const queue = new GroupQueue(); + /** * Translate a JID from LID format to phone format if we have a mapping. * Returns the original JID if no mapping exists. @@ -81,18 +89,12 @@ async function setTyping(jid: string, isTyping: boolean): Promise { } function loadState(): void { - const statePath = path.join(DATA_DIR, 'router_state.json'); - const state = loadJson<{ - last_timestamp?: string; - last_agent_timestamp?: Record; - }>(statePath, {}); - lastTimestamp = state.last_timestamp || ''; - lastAgentTimestamp = state.last_agent_timestamp || {}; - sessions = loadJson(path.join(DATA_DIR, 'sessions.json'), {}); - registeredGroups = loadJson( - path.join(DATA_DIR, 'registered_groups.json'), - {}, - ); + // Load from SQLite (migration from JSON happens in initDatabase) + lastTimestamp = getRouterState('last_timestamp') || ''; + const agentTs = getRouterState('last_agent_timestamp'); + lastAgentTimestamp = agentTs ? JSON.parse(agentTs) : {}; + sessions = getAllSessions(); + registeredGroups = getAllRegisteredGroups(); logger.info( { groupCount: Object.keys(registeredGroups).length }, 'State loaded', @@ -100,16 +102,16 @@ function loadState(): void { } function saveState(): void { - saveJson(path.join(DATA_DIR, 'router_state.json'), { - last_timestamp: lastTimestamp, - last_agent_timestamp: lastAgentTimestamp, - }); - saveJson(path.join(DATA_DIR, 'sessions.json'), sessions); + setRouterState('last_timestamp', lastTimestamp); + setRouterState( + 'last_agent_timestamp', + JSON.stringify(lastAgentTimestamp), + ); } function registerGroup(jid: string, group: RegisteredGroup): void { registeredGroups[jid] = group; - saveJson(path.join(DATA_DIR, 'registered_groups.json'), registeredGroups); + setRegisteredGroup(jid, group); // Create group folder const groupDir = path.join(DATA_DIR, '..', 'groups', group.folder); @@ -177,26 +179,35 @@ function getAvailableGroups(): AvailableGroup[] { })); } -async function processMessage(msg: NewMessage): Promise { - const group = registeredGroups[msg.chat_jid]; +/** + * Process all pending messages for a group. + * Called by the GroupQueue when it's this group's turn. + */ +async function processGroupMessages(chatJid: string): Promise { + const group = registeredGroups[chatJid]; if (!group) return; - const content = msg.content.trim(); const isMainGroup = group.folder === MAIN_GROUP_FOLDER; - // Main group responds to all messages; other groups require trigger prefix - if (!isMainGroup && !TRIGGER_PATTERN.test(content)) return; - - // Get all messages since last agent interaction so the session has full context - const sinceTimestamp = lastAgentTimestamp[msg.chat_jid] || ''; + // Get all messages since last agent interaction + const sinceTimestamp = lastAgentTimestamp[chatJid] || ''; const missedMessages = getMessagesSince( - msg.chat_jid, + chatJid, sinceTimestamp, ASSISTANT_NAME, ); + if (missedMessages.length === 0) return; + + // For non-main groups, check if any message has the trigger + if (!isMainGroup) { + const hasTrigger = missedMessages.some((m) => + TRIGGER_PATTERN.test(m.content.trim()), + ); + if (!hasTrigger) return; + } + const lines = missedMessages.map((m) => { - // Escape XML special characters in content const escapeXml = (s: string) => s .replace(/&/g, '&') @@ -207,20 +218,21 @@ async function processMessage(msg: NewMessage): Promise { }); const prompt = `\n${lines.join('\n')}\n`; - if (!prompt) return; - logger.info( { group: group.name, messageCount: missedMessages.length }, - 'Processing message', + 'Processing messages', ); - await setTyping(msg.chat_jid, true); - const response = await runAgent(group, prompt, msg.chat_jid); - await setTyping(msg.chat_jid, false); + await setTyping(chatJid, true); + const response = await runAgent(group, prompt, chatJid); + await setTyping(chatJid, false); if (response) { - lastAgentTimestamp[msg.chat_jid] = msg.timestamp; - await sendMessage(msg.chat_jid, `${ASSISTANT_NAME}: ${response}`); + // Fix batching bug: advance to latest message in batch, not just the trigger + lastAgentTimestamp[chatJid] = + missedMessages[missedMessages.length - 1].timestamp; + saveState(); + await sendMessage(chatJid, `${ASSISTANT_NAME}: ${response}`); } } @@ -258,17 +270,21 @@ async function runAgent( ); try { - const output = await runContainerAgent(group, { - prompt, - sessionId, - groupFolder: group.folder, - chatJid, - isMain, - }); + const output = await runContainerAgent( + group, + { + prompt, + sessionId, + groupFolder: group.folder, + chatJid, + isMain, + }, + (proc) => queue.registerProcess(chatJid, proc), + ); if (output.newSessionId) { sessions[group.folder] = output.newSessionId; - saveJson(path.join(DATA_DIR, 'sessions.json'), sessions); + setSession(group.folder, output.newSessionId); } if (output.status === 'error') { @@ -692,7 +708,7 @@ async function connectWhatsApp(): Promise { } } else if (connection === 'open') { logger.info('Connected to WhatsApp'); - + // Build LID to phone mapping from auth state for self-chat translation if (sock.user) { const phoneUser = sock.user.id.split(':')[0]; @@ -702,7 +718,7 @@ async function connectWhatsApp(): Promise { logger.debug({ lidUser, phoneUser }, 'LID to phone mapping set'); } } - + // Sync group metadata on startup (respects 24h cache) syncGroupMetadata().catch((err) => logger.error({ err }, 'Initial group sync failed'), @@ -720,6 +736,8 @@ async function connectWhatsApp(): Promise { sendMessage, registeredGroups: () => registeredGroups, getSessions: () => sessions, + queue, + onProcess: (groupJid, proc) => queue.registerProcess(groupJid, proc), }); startIpcWatcher(); startMessageLoop(); @@ -736,7 +754,7 @@ async function connectWhatsApp(): Promise { // Translate LID JID to phone JID if applicable const chatJid = translateJid(rawJid); - + const timestamp = new Date( Number(msg.messageTimestamp) * 1000, ).toISOString(); @@ -763,28 +781,36 @@ async function startMessageLoop(): Promise { return; } messageLoopRunning = true; + + // Wire up the queue's message processing function + queue.setProcessMessagesFn(processGroupMessages); + logger.info(`NanoClaw running (trigger: @${ASSISTANT_NAME})`); while (true) { try { const jids = Object.keys(registeredGroups); - const { messages } = getNewMessages(jids, lastTimestamp, ASSISTANT_NAME); + const { messages, newTimestamp } = getNewMessages( + jids, + lastTimestamp, + ASSISTANT_NAME, + ); - if (messages.length > 0) + if (messages.length > 0) { logger.info({ count: messages.length }, 'New messages'); - for (const msg of messages) { - try { - await processMessage(msg); - // Only advance timestamp after successful processing for at-least-once delivery - lastTimestamp = msg.timestamp; - saveState(); - } catch (err) { - logger.error( - { err, msg: msg.id }, - 'Error processing message, will retry', - ); - // Stop processing this batch - failed message will be retried next loop - break; + + // Advance the "seen" cursor for all messages immediately + lastTimestamp = newTimestamp; + saveState(); + + // Deduplicate by group and enqueue + const groupsWithMessages = new Set(); + for (const msg of messages) { + groupsWithMessages.add(msg.chat_jid); + } + + for (const chatJid of groupsWithMessages) { + queue.enqueueMessageCheck(chatJid); } } } catch (err) { @@ -794,6 +820,26 @@ async function startMessageLoop(): Promise { } } +/** + * Startup recovery: check for unprocessed messages in registered groups. + * Handles crash between advancing lastTimestamp and processing messages. + */ +function recoverPendingMessages(): void { + queue.setProcessMessagesFn(processGroupMessages); + + for (const [chatJid, group] of Object.entries(registeredGroups)) { + const sinceTimestamp = lastAgentTimestamp[chatJid] || ''; + const pending = getMessagesSince(chatJid, sinceTimestamp, ASSISTANT_NAME); + if (pending.length > 0) { + logger.info( + { group: group.name, pendingCount: pending.length }, + 'Recovery: found unprocessed messages', + ); + queue.enqueueMessageCheck(chatJid); + } + } +} + function ensureContainerSystemRunning(): void { try { execSync('container system status', { stdio: 'pipe' }); @@ -857,6 +903,17 @@ async function main(): Promise { initDatabase(); logger.info('Database initialized'); loadState(); + recoverPendingMessages(); + + // Graceful shutdown handlers + const shutdown = async (signal: string) => { + logger.info({ signal }, 'Shutdown signal received'); + await queue.shutdown(10000); + process.exit(0); + }; + process.on('SIGTERM', () => shutdown('SIGTERM')); + process.on('SIGINT', () => shutdown('SIGINT')); + await connectWhatsApp(); } diff --git a/src/task-scheduler.ts b/src/task-scheduler.ts index bb3857b..6b517ae 100644 --- a/src/task-scheduler.ts +++ b/src/task-scheduler.ts @@ -1,9 +1,9 @@ +import { ChildProcess } from 'child_process'; import { CronExpressionParser } from 'cron-parser'; import fs from 'fs'; import path from 'path'; import { - DATA_DIR, GROUPS_DIR, MAIN_GROUP_FOLDER, SCHEDULER_POLL_INTERVAL, @@ -17,6 +17,7 @@ import { logTaskRun, updateTaskAfterRun, } from './db.js'; +import { GroupQueue } from './group-queue.js'; import { logger } from './logger.js'; import { RegisteredGroup, ScheduledTask } from './types.js'; @@ -24,6 +25,8 @@ export interface SchedulerDependencies { sendMessage: (jid: string, text: string) => Promise; registeredGroups: () => Record; getSessions: () => Record; + queue: GroupQueue; + onProcess: (groupJid: string, proc: ChildProcess) => void; } async function runTask( @@ -86,14 +89,17 @@ async function runTask( task.context_mode === 'group' ? sessions[task.group_folder] : undefined; try { - const output = await runContainerAgent(group, { - prompt: task.prompt, - sessionId, - groupFolder: task.group_folder, - chatJid: task.chat_jid, - isMain, - isScheduledTask: true, - }); + const output = await runContainerAgent( + group, + { + prompt: task.prompt, + sessionId, + groupFolder: task.group_folder, + chatJid: task.chat_jid, + isMain, + }, + (proc) => deps.onProcess(task.chat_jid, proc), + ); if (output.status === 'error') { error = output.error || 'Unknown error'; @@ -165,7 +171,11 @@ export function startSchedulerLoop(deps: SchedulerDependencies): void { continue; } - await runTask(currentTask, deps); + deps.queue.enqueueTask( + currentTask.chat_jid, + currentTask.id, + () => runTask(currentTask, deps), + ); } } catch (err) { logger.error({ err }, 'Error in scheduler loop'); diff --git a/src/types.ts b/src/types.ts index 5f388be..53027b4 100644 --- a/src/types.ts +++ b/src/types.ts @@ -30,7 +30,6 @@ export interface AllowedRoot { export interface ContainerConfig { additionalMounts?: AdditionalMount[]; timeout?: number; // Default: 300000 (5 minutes) - env?: Record; } export interface RegisteredGroup { @@ -41,10 +40,6 @@ export interface RegisteredGroup { containerConfig?: ContainerConfig; } -export interface Session { - [folder: string]: string; -} - export interface NewMessage { id: string; chat_jid: string; diff --git a/src/utils.ts b/src/utils.ts deleted file mode 100644 index 3cd7d0b..0000000 --- a/src/utils.ts +++ /dev/null @@ -1,18 +0,0 @@ -import fs from 'fs'; -import path from 'path'; - -export function loadJson(filePath: string, defaultValue: T): T { - try { - if (fs.existsSync(filePath)) { - return JSON.parse(fs.readFileSync(filePath, 'utf-8')); - } - } catch { - // Return default on error - } - return defaultValue; -} - -export function saveJson(filePath: string, data: unknown): void { - fs.mkdirSync(path.dirname(filePath), { recursive: true }); - fs.writeFileSync(filePath, JSON.stringify(data, null, 2)); -}