feat: per-group queue, SQLite state, graceful shutdown

Add per-group container locking with global concurrency limit to prevent
concurrent containers for the same group (#89) and cap total containers.
Fix message batching bug where lastAgentTimestamp advanced to trigger
message instead of latest in batch, causing redundant re-processing.
Move router state, sessions, and registered groups from JSON files to
SQLite with automatic one-time migration. Add SIGTERM/SIGINT handlers
with graceful shutdown (SIGTERM -> grace period -> SIGKILL). Add startup
recovery for messages missed during crash. Remove dead code: utils.ts,
Session type, isScheduledTask flag, ContainerConfig.env, getTaskRunLogs,
GroupQueue.isActive.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
gavrielc
2026-02-06 07:38:07 +02:00
parent db216a459e
commit eac9a6acfd
8 changed files with 591 additions and 115 deletions

View File

@@ -31,6 +31,10 @@ export const CONTAINER_MAX_OUTPUT_SIZE = parseInt(
10, 10,
); // 10MB default ); // 10MB default
export const IPC_POLL_INTERVAL = 1000; export const IPC_POLL_INTERVAL = 1000;
export const MAX_CONCURRENT_CONTAINERS = parseInt(
process.env.MAX_CONCURRENT_CONTAINERS || '3',
10,
);
function escapeRegex(str: string): string { function escapeRegex(str: string): string {
return str.replace(/[.*+?^${}()|[\]\\]/g, '\\$&'); return str.replace(/[.*+?^${}()|[\]\\]/g, '\\$&');

View File

@@ -2,7 +2,7 @@
* Container Runner for NanoClaw * Container Runner for NanoClaw
* Spawns agent execution in Apple Container and handles IPC * Spawns agent execution in Apple Container and handles IPC
*/ */
import { exec, spawn } from 'child_process'; import { ChildProcess, exec, spawn } from 'child_process';
import fs from 'fs'; import fs from 'fs';
import os from 'os'; import os from 'os';
import path from 'path'; import path from 'path';
@@ -38,7 +38,6 @@ export interface ContainerInput {
groupFolder: string; groupFolder: string;
chatJid: string; chatJid: string;
isMain: boolean; isMain: boolean;
isScheduledTask?: boolean;
} }
export interface ContainerOutput { export interface ContainerOutput {
@@ -51,7 +50,7 @@ export interface ContainerOutput {
interface VolumeMount { interface VolumeMount {
hostPath: string; hostPath: string;
containerPath: string; containerPath: string;
readonly?: boolean; readonly: boolean;
} }
function buildVolumeMounts( function buildVolumeMounts(
@@ -185,6 +184,7 @@ function buildContainerArgs(mounts: VolumeMount[], containerName: string): strin
export async function runContainerAgent( export async function runContainerAgent(
group: RegisteredGroup, group: RegisteredGroup,
input: ContainerInput, input: ContainerInput,
onProcess: (proc: ChildProcess) => void,
): Promise<ContainerOutput> { ): Promise<ContainerOutput> {
const startTime = Date.now(); const startTime = Date.now();
@@ -227,6 +227,8 @@ export async function runContainerAgent(
stdio: ['pipe', 'pipe', 'pipe'], stdio: ['pipe', 'pipe', 'pipe'],
}); });
onProcess(container);
let stdout = ''; let stdout = '';
let stderr = ''; let stderr = '';
let stdoutTruncated = false; let stdoutTruncated = false;

206
src/db.ts
View File

@@ -4,8 +4,8 @@ import path from 'path';
import { proto } from '@whiskeysockets/baileys'; import { proto } from '@whiskeysockets/baileys';
import { STORE_DIR } from './config.js'; import { DATA_DIR, STORE_DIR } from './config.js';
import { NewMessage, ScheduledTask, TaskRunLog } from './types.js'; import { NewMessage, RegisteredGroup, ScheduledTask, TaskRunLog } from './types.js';
let db: Database.Database; let db: Database.Database;
@@ -77,6 +77,29 @@ export function initDatabase(): void {
} catch { } catch {
/* column already exists */ /* column already exists */
} }
// State tables (replacing JSON files)
db.exec(`
CREATE TABLE IF NOT EXISTS router_state (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS sessions (
group_folder TEXT PRIMARY KEY,
session_id TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS registered_groups (
jid TEXT PRIMARY KEY,
name TEXT NOT NULL,
folder TEXT NOT NULL UNIQUE,
trigger_pattern TEXT NOT NULL,
added_at TEXT NOT NULL,
container_config TEXT
);
`);
// Migrate from JSON files if they exist
migrateJsonState();
} }
/** /**
@@ -381,16 +404,171 @@ export function logTaskRun(log: TaskRunLog): void {
); );
} }
export function getTaskRunLogs(taskId: string, limit = 10): TaskRunLog[] { // --- Router state accessors ---
return db
.prepare( export function getRouterState(key: string): string | undefined {
` const row = db
SELECT task_id, run_at, duration_ms, status, result, error .prepare('SELECT value FROM router_state WHERE key = ?')
FROM task_run_logs .get(key) as { value: string } | undefined;
WHERE task_id = ? return row?.value;
ORDER BY run_at DESC }
LIMIT ?
`, export function setRouterState(key: string, value: string): void {
) db.prepare(
.all(taskId, limit) as TaskRunLog[]; 'INSERT OR REPLACE INTO router_state (key, value) VALUES (?, ?)',
).run(key, value);
}
// --- Session accessors ---
export function getSession(groupFolder: string): string | undefined {
const row = db
.prepare('SELECT session_id FROM sessions WHERE group_folder = ?')
.get(groupFolder) as { session_id: string } | undefined;
return row?.session_id;
}
export function setSession(groupFolder: string, sessionId: string): void {
db.prepare(
'INSERT OR REPLACE INTO sessions (group_folder, session_id) VALUES (?, ?)',
).run(groupFolder, sessionId);
}
export function getAllSessions(): Record<string, string> {
const rows = db
.prepare('SELECT group_folder, session_id FROM sessions')
.all() as Array<{ group_folder: string; session_id: string }>;
const result: Record<string, string> = {};
for (const row of rows) {
result[row.group_folder] = row.session_id;
}
return result;
}
// --- Registered group accessors ---
export function getRegisteredGroup(
jid: string,
): (RegisteredGroup & { jid: string }) | undefined {
const row = db
.prepare('SELECT * FROM registered_groups WHERE jid = ?')
.get(jid) as
| {
jid: string;
name: string;
folder: string;
trigger_pattern: string;
added_at: string;
container_config: string | null;
}
| undefined;
if (!row) return undefined;
return {
jid: row.jid,
name: row.name,
folder: row.folder,
trigger: row.trigger_pattern,
added_at: row.added_at,
containerConfig: row.container_config
? JSON.parse(row.container_config)
: undefined,
};
}
export function setRegisteredGroup(
jid: string,
group: RegisteredGroup,
): void {
db.prepare(
`INSERT OR REPLACE INTO registered_groups (jid, name, folder, trigger_pattern, added_at, container_config)
VALUES (?, ?, ?, ?, ?, ?)`,
).run(
jid,
group.name,
group.folder,
group.trigger,
group.added_at,
group.containerConfig ? JSON.stringify(group.containerConfig) : null,
);
}
export function getAllRegisteredGroups(): Record<string, RegisteredGroup> {
const rows = db
.prepare('SELECT * FROM registered_groups')
.all() as Array<{
jid: string;
name: string;
folder: string;
trigger_pattern: string;
added_at: string;
container_config: string | null;
}>;
const result: Record<string, RegisteredGroup> = {};
for (const row of rows) {
result[row.jid] = {
name: row.name,
folder: row.folder,
trigger: row.trigger_pattern,
added_at: row.added_at,
containerConfig: row.container_config
? JSON.parse(row.container_config)
: undefined,
};
}
return result;
}
// --- JSON migration ---
function migrateJsonState(): void {
const migrateFile = (filename: string) => {
const filePath = path.join(DATA_DIR, filename);
if (!fs.existsSync(filePath)) return null;
try {
const data = JSON.parse(fs.readFileSync(filePath, 'utf-8'));
fs.renameSync(filePath, `${filePath}.migrated`);
return data;
} catch {
return null;
}
};
// Migrate router_state.json
const routerState = migrateFile('router_state.json') as {
last_timestamp?: string;
last_agent_timestamp?: Record<string, string>;
} | null;
if (routerState) {
if (routerState.last_timestamp) {
setRouterState('last_timestamp', routerState.last_timestamp);
}
if (routerState.last_agent_timestamp) {
setRouterState(
'last_agent_timestamp',
JSON.stringify(routerState.last_agent_timestamp),
);
}
}
// Migrate sessions.json
const sessions = migrateFile('sessions.json') as Record<
string,
string
> | null;
if (sessions) {
for (const [folder, sessionId] of Object.entries(sessions)) {
setSession(folder, sessionId);
}
}
// Migrate registered_groups.json
const groups = migrateFile('registered_groups.json') as Record<
string,
RegisteredGroup
> | null;
if (groups) {
for (const [jid, group] of Object.entries(groups)) {
setRegisteredGroup(jid, group);
}
}
} }

248
src/group-queue.ts Normal file
View File

@@ -0,0 +1,248 @@
import { ChildProcess } from 'child_process';
import { MAX_CONCURRENT_CONTAINERS } from './config.js';
import { logger } from './logger.js';
interface QueuedTask {
id: string;
groupJid: string;
fn: () => Promise<void>;
}
interface GroupState {
active: boolean;
pendingMessages: boolean;
pendingTasks: QueuedTask[];
process: ChildProcess | null;
}
export class GroupQueue {
private groups = new Map<string, GroupState>();
private activeCount = 0;
private waitingGroups: string[] = [];
private processMessagesFn: ((groupJid: string) => Promise<void>) | null =
null;
private shuttingDown = false;
private getGroup(groupJid: string): GroupState {
let state = this.groups.get(groupJid);
if (!state) {
state = {
active: false,
pendingMessages: false,
pendingTasks: [],
process: null,
};
this.groups.set(groupJid, state);
}
return state;
}
setProcessMessagesFn(fn: (groupJid: string) => Promise<void>): void {
this.processMessagesFn = fn;
}
enqueueMessageCheck(groupJid: string): void {
if (this.shuttingDown) return;
const state = this.getGroup(groupJid);
if (state.active) {
state.pendingMessages = true;
logger.debug({ groupJid }, 'Container active, message queued');
return;
}
if (this.activeCount >= MAX_CONCURRENT_CONTAINERS) {
state.pendingMessages = true;
if (!this.waitingGroups.includes(groupJid)) {
this.waitingGroups.push(groupJid);
}
logger.debug(
{ groupJid, activeCount: this.activeCount },
'At concurrency limit, message queued',
);
return;
}
this.runForGroup(groupJid, 'messages');
}
enqueueTask(groupJid: string, taskId: string, fn: () => Promise<void>): void {
if (this.shuttingDown) return;
const state = this.getGroup(groupJid);
// Prevent double-queuing of the same task
if (state.pendingTasks.some((t) => t.id === taskId)) {
logger.debug({ groupJid, taskId }, 'Task already queued, skipping');
return;
}
if (state.active) {
state.pendingTasks.push({ id: taskId, groupJid, fn });
logger.debug({ groupJid, taskId }, 'Container active, task queued');
return;
}
if (this.activeCount >= MAX_CONCURRENT_CONTAINERS) {
state.pendingTasks.push({ id: taskId, groupJid, fn });
if (!this.waitingGroups.includes(groupJid)) {
this.waitingGroups.push(groupJid);
}
logger.debug(
{ groupJid, taskId, activeCount: this.activeCount },
'At concurrency limit, task queued',
);
return;
}
// Run immediately
this.runTask(groupJid, { id: taskId, groupJid, fn });
}
registerProcess(groupJid: string, proc: ChildProcess): void {
const state = this.getGroup(groupJid);
state.process = proc;
}
private async runForGroup(
groupJid: string,
reason: 'messages' | 'drain',
): Promise<void> {
const state = this.getGroup(groupJid);
state.active = true;
state.pendingMessages = false;
this.activeCount++;
logger.debug(
{ groupJid, reason, activeCount: this.activeCount },
'Starting container for group',
);
try {
if (this.processMessagesFn) {
await this.processMessagesFn(groupJid);
}
} catch (err) {
logger.error({ groupJid, err }, 'Error processing messages for group');
} finally {
state.active = false;
state.process = null;
this.activeCount--;
this.drainGroup(groupJid);
}
}
private async runTask(groupJid: string, task: QueuedTask): Promise<void> {
const state = this.getGroup(groupJid);
state.active = true;
this.activeCount++;
logger.debug(
{ groupJid, taskId: task.id, activeCount: this.activeCount },
'Running queued task',
);
try {
await task.fn();
} catch (err) {
logger.error({ groupJid, taskId: task.id, err }, 'Error running task');
} finally {
state.active = false;
state.process = null;
this.activeCount--;
this.drainGroup(groupJid);
}
}
private drainGroup(groupJid: string): void {
if (this.shuttingDown) return;
const state = this.getGroup(groupJid);
// Tasks first (they won't be re-discovered from SQLite like messages)
if (state.pendingTasks.length > 0) {
const task = state.pendingTasks.shift()!;
this.runTask(groupJid, task);
return;
}
// Then pending messages
if (state.pendingMessages) {
this.runForGroup(groupJid, 'drain');
return;
}
// Nothing pending for this group; check if other groups are waiting for a slot
this.drainWaiting();
}
private drainWaiting(): void {
while (
this.waitingGroups.length > 0 &&
this.activeCount < MAX_CONCURRENT_CONTAINERS
) {
const nextJid = this.waitingGroups.shift()!;
const state = this.getGroup(nextJid);
// Prioritize tasks over messages
if (state.pendingTasks.length > 0) {
const task = state.pendingTasks.shift()!;
this.runTask(nextJid, task);
} else if (state.pendingMessages) {
this.runForGroup(nextJid, 'drain');
}
// If neither pending, skip this group
}
}
async shutdown(gracePeriodMs: number): Promise<void> {
this.shuttingDown = true;
logger.info(
{ activeCount: this.activeCount, gracePeriodMs },
'GroupQueue shutting down',
);
// Collect all active processes
const activeProcs: Array<{ jid: string; proc: ChildProcess }> = [];
for (const [jid, state] of this.groups) {
if (state.process && !state.process.killed) {
activeProcs.push({ jid, proc: state.process });
}
}
if (activeProcs.length === 0) return;
// Send SIGTERM to all
for (const { jid, proc } of activeProcs) {
logger.info({ jid, pid: proc.pid }, 'Sending SIGTERM to container');
proc.kill('SIGTERM');
}
// Wait for grace period
await new Promise<void>((resolve) => {
const checkInterval = setInterval(() => {
const alive = activeProcs.filter(
({ proc }) => !proc.killed && proc.exitCode === null,
);
if (alive.length === 0) {
clearInterval(checkInterval);
resolve();
}
}, 500);
setTimeout(() => {
clearInterval(checkInterval);
// SIGKILL survivors
for (const { jid, proc } of activeProcs) {
if (!proc.killed && proc.exitCode === null) {
logger.warn({ jid, pid: proc.pid }, 'Sending SIGKILL to container');
proc.kill('SIGKILL');
}
}
resolve();
}, gracePeriodMs);
});
}
}

View File

@@ -27,27 +27,33 @@ import {
} from './container-runner.js'; } from './container-runner.js';
import { import {
getAllChats, getAllChats,
getAllRegisteredGroups,
getAllSessions,
getAllTasks, getAllTasks,
getLastGroupSync, getLastGroupSync,
getMessagesSince, getMessagesSince,
getNewMessages, getNewMessages,
getRouterState,
getTaskById, getTaskById,
initDatabase, initDatabase,
setLastGroupSync, setLastGroupSync,
setRegisteredGroup,
setRouterState,
setSession,
storeChatMetadata, storeChatMetadata,
storeMessage, storeMessage,
updateChatName, updateChatName,
} from './db.js'; } from './db.js';
import { GroupQueue } from './group-queue.js';
import { startSchedulerLoop } from './task-scheduler.js'; import { startSchedulerLoop } from './task-scheduler.js';
import { NewMessage, RegisteredGroup, Session } from './types.js'; import { RegisteredGroup } from './types.js';
import { loadJson, saveJson } from './utils.js';
import { logger } from './logger.js'; import { logger } from './logger.js';
const GROUP_SYNC_INTERVAL_MS = 24 * 60 * 60 * 1000; // 24 hours const GROUP_SYNC_INTERVAL_MS = 24 * 60 * 60 * 1000; // 24 hours
let sock: WASocket; let sock: WASocket;
let lastTimestamp = ''; let lastTimestamp = '';
let sessions: Session = {}; let sessions: Record<string, string> = {};
let registeredGroups: Record<string, RegisteredGroup> = {}; let registeredGroups: Record<string, RegisteredGroup> = {};
let lastAgentTimestamp: Record<string, string> = {}; let lastAgentTimestamp: Record<string, string> = {};
// LID to phone number mapping (WhatsApp now sends LID JIDs for self-chats) // LID to phone number mapping (WhatsApp now sends LID JIDs for self-chats)
@@ -57,6 +63,8 @@ let messageLoopRunning = false;
let ipcWatcherRunning = false; let ipcWatcherRunning = false;
let groupSyncTimerStarted = false; let groupSyncTimerStarted = false;
const queue = new GroupQueue();
/** /**
* Translate a JID from LID format to phone format if we have a mapping. * Translate a JID from LID format to phone format if we have a mapping.
* Returns the original JID if no mapping exists. * Returns the original JID if no mapping exists.
@@ -81,18 +89,12 @@ async function setTyping(jid: string, isTyping: boolean): Promise<void> {
} }
function loadState(): void { function loadState(): void {
const statePath = path.join(DATA_DIR, 'router_state.json'); // Load from SQLite (migration from JSON happens in initDatabase)
const state = loadJson<{ lastTimestamp = getRouterState('last_timestamp') || '';
last_timestamp?: string; const agentTs = getRouterState('last_agent_timestamp');
last_agent_timestamp?: Record<string, string>; lastAgentTimestamp = agentTs ? JSON.parse(agentTs) : {};
}>(statePath, {}); sessions = getAllSessions();
lastTimestamp = state.last_timestamp || ''; registeredGroups = getAllRegisteredGroups();
lastAgentTimestamp = state.last_agent_timestamp || {};
sessions = loadJson(path.join(DATA_DIR, 'sessions.json'), {});
registeredGroups = loadJson(
path.join(DATA_DIR, 'registered_groups.json'),
{},
);
logger.info( logger.info(
{ groupCount: Object.keys(registeredGroups).length }, { groupCount: Object.keys(registeredGroups).length },
'State loaded', 'State loaded',
@@ -100,16 +102,16 @@ function loadState(): void {
} }
function saveState(): void { function saveState(): void {
saveJson(path.join(DATA_DIR, 'router_state.json'), { setRouterState('last_timestamp', lastTimestamp);
last_timestamp: lastTimestamp, setRouterState(
last_agent_timestamp: lastAgentTimestamp, 'last_agent_timestamp',
}); JSON.stringify(lastAgentTimestamp),
saveJson(path.join(DATA_DIR, 'sessions.json'), sessions); );
} }
function registerGroup(jid: string, group: RegisteredGroup): void { function registerGroup(jid: string, group: RegisteredGroup): void {
registeredGroups[jid] = group; registeredGroups[jid] = group;
saveJson(path.join(DATA_DIR, 'registered_groups.json'), registeredGroups); setRegisteredGroup(jid, group);
// Create group folder // Create group folder
const groupDir = path.join(DATA_DIR, '..', 'groups', group.folder); const groupDir = path.join(DATA_DIR, '..', 'groups', group.folder);
@@ -177,26 +179,35 @@ function getAvailableGroups(): AvailableGroup[] {
})); }));
} }
async function processMessage(msg: NewMessage): Promise<void> { /**
const group = registeredGroups[msg.chat_jid]; * Process all pending messages for a group.
* Called by the GroupQueue when it's this group's turn.
*/
async function processGroupMessages(chatJid: string): Promise<void> {
const group = registeredGroups[chatJid];
if (!group) return; if (!group) return;
const content = msg.content.trim();
const isMainGroup = group.folder === MAIN_GROUP_FOLDER; const isMainGroup = group.folder === MAIN_GROUP_FOLDER;
// Main group responds to all messages; other groups require trigger prefix // Get all messages since last agent interaction
if (!isMainGroup && !TRIGGER_PATTERN.test(content)) return; const sinceTimestamp = lastAgentTimestamp[chatJid] || '';
// Get all messages since last agent interaction so the session has full context
const sinceTimestamp = lastAgentTimestamp[msg.chat_jid] || '';
const missedMessages = getMessagesSince( const missedMessages = getMessagesSince(
msg.chat_jid, chatJid,
sinceTimestamp, sinceTimestamp,
ASSISTANT_NAME, ASSISTANT_NAME,
); );
if (missedMessages.length === 0) return;
// For non-main groups, check if any message has the trigger
if (!isMainGroup) {
const hasTrigger = missedMessages.some((m) =>
TRIGGER_PATTERN.test(m.content.trim()),
);
if (!hasTrigger) return;
}
const lines = missedMessages.map((m) => { const lines = missedMessages.map((m) => {
// Escape XML special characters in content
const escapeXml = (s: string) => const escapeXml = (s: string) =>
s s
.replace(/&/g, '&amp;') .replace(/&/g, '&amp;')
@@ -207,20 +218,21 @@ async function processMessage(msg: NewMessage): Promise<void> {
}); });
const prompt = `<messages>\n${lines.join('\n')}\n</messages>`; const prompt = `<messages>\n${lines.join('\n')}\n</messages>`;
if (!prompt) return;
logger.info( logger.info(
{ group: group.name, messageCount: missedMessages.length }, { group: group.name, messageCount: missedMessages.length },
'Processing message', 'Processing messages',
); );
await setTyping(msg.chat_jid, true); await setTyping(chatJid, true);
const response = await runAgent(group, prompt, msg.chat_jid); const response = await runAgent(group, prompt, chatJid);
await setTyping(msg.chat_jid, false); await setTyping(chatJid, false);
if (response) { if (response) {
lastAgentTimestamp[msg.chat_jid] = msg.timestamp; // Fix batching bug: advance to latest message in batch, not just the trigger
await sendMessage(msg.chat_jid, `${ASSISTANT_NAME}: ${response}`); lastAgentTimestamp[chatJid] =
missedMessages[missedMessages.length - 1].timestamp;
saveState();
await sendMessage(chatJid, `${ASSISTANT_NAME}: ${response}`);
} }
} }
@@ -258,17 +270,21 @@ async function runAgent(
); );
try { try {
const output = await runContainerAgent(group, { const output = await runContainerAgent(
prompt, group,
sessionId, {
groupFolder: group.folder, prompt,
chatJid, sessionId,
isMain, groupFolder: group.folder,
}); chatJid,
isMain,
},
(proc) => queue.registerProcess(chatJid, proc),
);
if (output.newSessionId) { if (output.newSessionId) {
sessions[group.folder] = output.newSessionId; sessions[group.folder] = output.newSessionId;
saveJson(path.join(DATA_DIR, 'sessions.json'), sessions); setSession(group.folder, output.newSessionId);
} }
if (output.status === 'error') { if (output.status === 'error') {
@@ -720,6 +736,8 @@ async function connectWhatsApp(): Promise<void> {
sendMessage, sendMessage,
registeredGroups: () => registeredGroups, registeredGroups: () => registeredGroups,
getSessions: () => sessions, getSessions: () => sessions,
queue,
onProcess: (groupJid, proc) => queue.registerProcess(groupJid, proc),
}); });
startIpcWatcher(); startIpcWatcher();
startMessageLoop(); startMessageLoop();
@@ -763,28 +781,36 @@ async function startMessageLoop(): Promise<void> {
return; return;
} }
messageLoopRunning = true; messageLoopRunning = true;
// Wire up the queue's message processing function
queue.setProcessMessagesFn(processGroupMessages);
logger.info(`NanoClaw running (trigger: @${ASSISTANT_NAME})`); logger.info(`NanoClaw running (trigger: @${ASSISTANT_NAME})`);
while (true) { while (true) {
try { try {
const jids = Object.keys(registeredGroups); const jids = Object.keys(registeredGroups);
const { messages } = getNewMessages(jids, lastTimestamp, ASSISTANT_NAME); const { messages, newTimestamp } = getNewMessages(
jids,
lastTimestamp,
ASSISTANT_NAME,
);
if (messages.length > 0) if (messages.length > 0) {
logger.info({ count: messages.length }, 'New messages'); logger.info({ count: messages.length }, 'New messages');
for (const msg of messages) {
try { // Advance the "seen" cursor for all messages immediately
await processMessage(msg); lastTimestamp = newTimestamp;
// Only advance timestamp after successful processing for at-least-once delivery saveState();
lastTimestamp = msg.timestamp;
saveState(); // Deduplicate by group and enqueue
} catch (err) { const groupsWithMessages = new Set<string>();
logger.error( for (const msg of messages) {
{ err, msg: msg.id }, groupsWithMessages.add(msg.chat_jid);
'Error processing message, will retry', }
);
// Stop processing this batch - failed message will be retried next loop for (const chatJid of groupsWithMessages) {
break; queue.enqueueMessageCheck(chatJid);
} }
} }
} catch (err) { } catch (err) {
@@ -794,6 +820,26 @@ async function startMessageLoop(): Promise<void> {
} }
} }
/**
* Startup recovery: check for unprocessed messages in registered groups.
* Handles crash between advancing lastTimestamp and processing messages.
*/
function recoverPendingMessages(): void {
queue.setProcessMessagesFn(processGroupMessages);
for (const [chatJid, group] of Object.entries(registeredGroups)) {
const sinceTimestamp = lastAgentTimestamp[chatJid] || '';
const pending = getMessagesSince(chatJid, sinceTimestamp, ASSISTANT_NAME);
if (pending.length > 0) {
logger.info(
{ group: group.name, pendingCount: pending.length },
'Recovery: found unprocessed messages',
);
queue.enqueueMessageCheck(chatJid);
}
}
}
function ensureContainerSystemRunning(): void { function ensureContainerSystemRunning(): void {
try { try {
execSync('container system status', { stdio: 'pipe' }); execSync('container system status', { stdio: 'pipe' });
@@ -857,6 +903,17 @@ async function main(): Promise<void> {
initDatabase(); initDatabase();
logger.info('Database initialized'); logger.info('Database initialized');
loadState(); loadState();
recoverPendingMessages();
// Graceful shutdown handlers
const shutdown = async (signal: string) => {
logger.info({ signal }, 'Shutdown signal received');
await queue.shutdown(10000);
process.exit(0);
};
process.on('SIGTERM', () => shutdown('SIGTERM'));
process.on('SIGINT', () => shutdown('SIGINT'));
await connectWhatsApp(); await connectWhatsApp();
} }

View File

@@ -1,9 +1,9 @@
import { ChildProcess } from 'child_process';
import { CronExpressionParser } from 'cron-parser'; import { CronExpressionParser } from 'cron-parser';
import fs from 'fs'; import fs from 'fs';
import path from 'path'; import path from 'path';
import { import {
DATA_DIR,
GROUPS_DIR, GROUPS_DIR,
MAIN_GROUP_FOLDER, MAIN_GROUP_FOLDER,
SCHEDULER_POLL_INTERVAL, SCHEDULER_POLL_INTERVAL,
@@ -17,6 +17,7 @@ import {
logTaskRun, logTaskRun,
updateTaskAfterRun, updateTaskAfterRun,
} from './db.js'; } from './db.js';
import { GroupQueue } from './group-queue.js';
import { logger } from './logger.js'; import { logger } from './logger.js';
import { RegisteredGroup, ScheduledTask } from './types.js'; import { RegisteredGroup, ScheduledTask } from './types.js';
@@ -24,6 +25,8 @@ export interface SchedulerDependencies {
sendMessage: (jid: string, text: string) => Promise<void>; sendMessage: (jid: string, text: string) => Promise<void>;
registeredGroups: () => Record<string, RegisteredGroup>; registeredGroups: () => Record<string, RegisteredGroup>;
getSessions: () => Record<string, string>; getSessions: () => Record<string, string>;
queue: GroupQueue;
onProcess: (groupJid: string, proc: ChildProcess) => void;
} }
async function runTask( async function runTask(
@@ -86,14 +89,17 @@ async function runTask(
task.context_mode === 'group' ? sessions[task.group_folder] : undefined; task.context_mode === 'group' ? sessions[task.group_folder] : undefined;
try { try {
const output = await runContainerAgent(group, { const output = await runContainerAgent(
prompt: task.prompt, group,
sessionId, {
groupFolder: task.group_folder, prompt: task.prompt,
chatJid: task.chat_jid, sessionId,
isMain, groupFolder: task.group_folder,
isScheduledTask: true, chatJid: task.chat_jid,
}); isMain,
},
(proc) => deps.onProcess(task.chat_jid, proc),
);
if (output.status === 'error') { if (output.status === 'error') {
error = output.error || 'Unknown error'; error = output.error || 'Unknown error';
@@ -165,7 +171,11 @@ export function startSchedulerLoop(deps: SchedulerDependencies): void {
continue; continue;
} }
await runTask(currentTask, deps); deps.queue.enqueueTask(
currentTask.chat_jid,
currentTask.id,
() => runTask(currentTask, deps),
);
} }
} catch (err) { } catch (err) {
logger.error({ err }, 'Error in scheduler loop'); logger.error({ err }, 'Error in scheduler loop');

View File

@@ -30,7 +30,6 @@ export interface AllowedRoot {
export interface ContainerConfig { export interface ContainerConfig {
additionalMounts?: AdditionalMount[]; additionalMounts?: AdditionalMount[];
timeout?: number; // Default: 300000 (5 minutes) timeout?: number; // Default: 300000 (5 minutes)
env?: Record<string, string>;
} }
export interface RegisteredGroup { export interface RegisteredGroup {
@@ -41,10 +40,6 @@ export interface RegisteredGroup {
containerConfig?: ContainerConfig; containerConfig?: ContainerConfig;
} }
export interface Session {
[folder: string]: string;
}
export interface NewMessage { export interface NewMessage {
id: string; id: string;
chat_jid: string; chat_jid: string;

View File

@@ -1,18 +0,0 @@
import fs from 'fs';
import path from 'path';
export function loadJson<T>(filePath: string, defaultValue: T): T {
try {
if (fs.existsSync(filePath)) {
return JSON.parse(fs.readFileSync(filePath, 'utf-8'));
}
} catch {
// Return default on error
}
return defaultValue;
}
export function saveJson(filePath: string, data: unknown): void {
fs.mkdirSync(path.dirname(filePath), { recursive: true });
fs.writeFileSync(filePath, JSON.stringify(data, null, 2));
}