import { exec, execSync } from 'child_process'; import fs from 'fs'; import path from 'path'; import pino from 'pino'; import makeWASocket, { DisconnectReason, WASocket, makeCacheableSignalKeyStore, useMultiFileAuthState, } from '@whiskeysockets/baileys'; import { ASSISTANT_NAME, DATA_DIR, IPC_POLL_INTERVAL, MAIN_GROUP_FOLDER, POLL_INTERVAL, STORE_DIR, TIMEZONE, TRIGGER_PATTERN, } from './config.js'; import { AvailableGroup, runContainerAgent, writeGroupsSnapshot, writeTasksSnapshot, } from './container-runner.js'; import { getAllChats, getAllTasks, getLastGroupSync, getMessagesSince, getNewMessages, getTaskById, initDatabase, setLastGroupSync, storeChatMetadata, storeMessage, updateChatName, } from './db.js'; import { startSchedulerLoop } from './task-scheduler.js'; import { NewMessage, RegisteredGroup, Session } from './types.js'; import { loadJson, saveJson } from './utils.js'; const GROUP_SYNC_INTERVAL_MS = 24 * 60 * 60 * 1000; // 24 hours const logger = pino({ level: process.env.LOG_LEVEL || 'info', transport: { target: 'pino-pretty', options: { colorize: true } }, }); let sock: WASocket; let lastTimestamp = ''; let sessions: Session = {}; let registeredGroups: Record = {}; let lastAgentTimestamp: Record = {}; async function setTyping(jid: string, isTyping: boolean): Promise { try { await sock.sendPresenceUpdate(isTyping ? 'composing' : 'paused', jid); } catch (err) { logger.debug({ jid, err }, 'Failed to update typing status'); } } function loadState(): void { const statePath = path.join(DATA_DIR, 'router_state.json'); const state = loadJson<{ last_timestamp?: string; last_agent_timestamp?: Record; }>(statePath, {}); lastTimestamp = state.last_timestamp || ''; lastAgentTimestamp = state.last_agent_timestamp || {}; sessions = loadJson(path.join(DATA_DIR, 'sessions.json'), {}); registeredGroups = loadJson( path.join(DATA_DIR, 'registered_groups.json'), {}, ); logger.info( { groupCount: Object.keys(registeredGroups).length }, 'State loaded', ); } function saveState(): void { saveJson(path.join(DATA_DIR, 'router_state.json'), { last_timestamp: lastTimestamp, last_agent_timestamp: lastAgentTimestamp, }); saveJson(path.join(DATA_DIR, 'sessions.json'), sessions); } function registerGroup(jid: string, group: RegisteredGroup): void { registeredGroups[jid] = group; saveJson(path.join(DATA_DIR, 'registered_groups.json'), registeredGroups); // Create group folder const groupDir = path.join(DATA_DIR, '..', 'groups', group.folder); fs.mkdirSync(path.join(groupDir, 'logs'), { recursive: true }); logger.info( { jid, name: group.name, folder: group.folder }, 'Group registered', ); } /** * Sync group metadata from WhatsApp. * Fetches all participating groups and stores their names in the database. * Called on startup, daily, and on-demand via IPC. */ async function syncGroupMetadata(force = false): Promise { // Check if we need to sync (skip if synced recently, unless forced) if (!force) { const lastSync = getLastGroupSync(); if (lastSync) { const lastSyncTime = new Date(lastSync).getTime(); const now = Date.now(); if (now - lastSyncTime < GROUP_SYNC_INTERVAL_MS) { logger.debug({ lastSync }, 'Skipping group sync - synced recently'); return; } } } try { logger.info('Syncing group metadata from WhatsApp...'); const groups = await sock.groupFetchAllParticipating(); let count = 0; for (const [jid, metadata] of Object.entries(groups)) { if (metadata.subject) { updateChatName(jid, metadata.subject); count++; } } setLastGroupSync(); logger.info({ count }, 'Group metadata synced'); } catch (err) { logger.error({ err }, 'Failed to sync group metadata'); } } /** * Get available groups list for the agent. * Returns groups ordered by most recent activity. */ function getAvailableGroups(): AvailableGroup[] { const chats = getAllChats(); const registeredJids = new Set(Object.keys(registeredGroups)); return chats .filter((c) => c.jid !== '__group_sync__' && c.jid.endsWith('@g.us')) .map((c) => ({ jid: c.jid, name: c.name, lastActivity: c.last_message_time, isRegistered: registeredJids.has(c.jid), })); } async function processMessage(msg: NewMessage): Promise { const group = registeredGroups[msg.chat_jid]; if (!group) return; const content = msg.content.trim(); const isMainGroup = group.folder === MAIN_GROUP_FOLDER; // Main group responds to all messages; other groups require trigger prefix if (!isMainGroup && !TRIGGER_PATTERN.test(content)) return; // Get all messages since last agent interaction so the session has full context const sinceTimestamp = lastAgentTimestamp[msg.chat_jid] || ''; const missedMessages = getMessagesSince( msg.chat_jid, sinceTimestamp, ASSISTANT_NAME, ); const lines = missedMessages.map((m) => { // Escape XML special characters in content const escapeXml = (s: string) => s .replace(/&/g, '&') .replace(//g, '>') .replace(/"/g, '"'); return `${escapeXml(m.content)}`; }); const prompt = `\n${lines.join('\n')}\n`; if (!prompt) return; logger.info( { group: group.name, messageCount: missedMessages.length }, 'Processing message', ); await setTyping(msg.chat_jid, true); const response = await runAgent(group, prompt, msg.chat_jid); await setTyping(msg.chat_jid, false); if (response) { lastAgentTimestamp[msg.chat_jid] = msg.timestamp; await sendMessage(msg.chat_jid, `${ASSISTANT_NAME}: ${response}`); } } async function runAgent( group: RegisteredGroup, prompt: string, chatJid: string, ): Promise { const isMain = group.folder === MAIN_GROUP_FOLDER; const sessionId = sessions[group.folder]; // Update tasks snapshot for container to read (filtered by group) const tasks = getAllTasks(); writeTasksSnapshot( group.folder, isMain, tasks.map((t) => ({ id: t.id, groupFolder: t.group_folder, prompt: t.prompt, schedule_type: t.schedule_type, schedule_value: t.schedule_value, status: t.status, next_run: t.next_run, })), ); // Update available groups snapshot (main group only can see all groups) const availableGroups = getAvailableGroups(); writeGroupsSnapshot( group.folder, isMain, availableGroups, new Set(Object.keys(registeredGroups)), ); try { const output = await runContainerAgent(group, { prompt, sessionId, groupFolder: group.folder, chatJid, isMain, }); if (output.newSessionId) { sessions[group.folder] = output.newSessionId; saveJson(path.join(DATA_DIR, 'sessions.json'), sessions); } if (output.status === 'error') { logger.error( { group: group.name, error: output.error }, 'Container agent error', ); return null; } return output.result; } catch (err) { logger.error({ group: group.name, err }, 'Agent error'); return null; } } async function sendMessage(jid: string, text: string): Promise { try { await sock.sendMessage(jid, { text }); logger.info({ jid, length: text.length }, 'Message sent'); } catch (err) { logger.error({ jid, err }, 'Failed to send message'); } } function startIpcWatcher(): void { const ipcBaseDir = path.join(DATA_DIR, 'ipc'); fs.mkdirSync(ipcBaseDir, { recursive: true }); const processIpcFiles = async () => { // Scan all group IPC directories (identity determined by directory) let groupFolders: string[]; try { groupFolders = fs.readdirSync(ipcBaseDir).filter((f) => { const stat = fs.statSync(path.join(ipcBaseDir, f)); return stat.isDirectory() && f !== 'errors'; }); } catch (err) { logger.error({ err }, 'Error reading IPC base directory'); setTimeout(processIpcFiles, IPC_POLL_INTERVAL); return; } for (const sourceGroup of groupFolders) { const isMain = sourceGroup === MAIN_GROUP_FOLDER; const messagesDir = path.join(ipcBaseDir, sourceGroup, 'messages'); const tasksDir = path.join(ipcBaseDir, sourceGroup, 'tasks'); // Process messages from this group's IPC directory try { if (fs.existsSync(messagesDir)) { const messageFiles = fs .readdirSync(messagesDir) .filter((f) => f.endsWith('.json')); for (const file of messageFiles) { const filePath = path.join(messagesDir, file); try { const data = JSON.parse(fs.readFileSync(filePath, 'utf-8')); if (data.type === 'message' && data.chatJid && data.text) { // Authorization: verify this group can send to this chatJid const targetGroup = registeredGroups[data.chatJid]; if ( isMain || (targetGroup && targetGroup.folder === sourceGroup) ) { await sendMessage( data.chatJid, `${ASSISTANT_NAME}: ${data.text}`, ); logger.info( { chatJid: data.chatJid, sourceGroup }, 'IPC message sent', ); } else { logger.warn( { chatJid: data.chatJid, sourceGroup }, 'Unauthorized IPC message attempt blocked', ); } } fs.unlinkSync(filePath); } catch (err) { logger.error( { file, sourceGroup, err }, 'Error processing IPC message', ); const errorDir = path.join(ipcBaseDir, 'errors'); fs.mkdirSync(errorDir, { recursive: true }); fs.renameSync( filePath, path.join(errorDir, `${sourceGroup}-${file}`), ); } } } } catch (err) { logger.error( { err, sourceGroup }, 'Error reading IPC messages directory', ); } // Process tasks from this group's IPC directory try { if (fs.existsSync(tasksDir)) { const taskFiles = fs .readdirSync(tasksDir) .filter((f) => f.endsWith('.json')); for (const file of taskFiles) { const filePath = path.join(tasksDir, file); try { const data = JSON.parse(fs.readFileSync(filePath, 'utf-8')); // Pass source group identity to processTaskIpc for authorization await processTaskIpc(data, sourceGroup, isMain); fs.unlinkSync(filePath); } catch (err) { logger.error( { file, sourceGroup, err }, 'Error processing IPC task', ); const errorDir = path.join(ipcBaseDir, 'errors'); fs.mkdirSync(errorDir, { recursive: true }); fs.renameSync( filePath, path.join(errorDir, `${sourceGroup}-${file}`), ); } } } } catch (err) { logger.error({ err, sourceGroup }, 'Error reading IPC tasks directory'); } } setTimeout(processIpcFiles, IPC_POLL_INTERVAL); }; processIpcFiles(); logger.info('IPC watcher started (per-group namespaces)'); } async function processTaskIpc( data: { type: string; taskId?: string; prompt?: string; schedule_type?: string; schedule_value?: string; context_mode?: string; groupFolder?: string; chatJid?: string; // For register_group jid?: string; name?: string; folder?: string; trigger?: string; containerConfig?: RegisteredGroup['containerConfig']; }, sourceGroup: string, // Verified identity from IPC directory isMain: boolean, // Verified from directory path ): Promise { // Import db functions dynamically to avoid circular deps const { createTask, updateTask, deleteTask, getTaskById: getTask, } = await import('./db.js'); const { CronExpressionParser } = await import('cron-parser'); switch (data.type) { case 'schedule_task': if ( data.prompt && data.schedule_type && data.schedule_value && data.groupFolder ) { // Authorization: non-main groups can only schedule for themselves const targetGroup = data.groupFolder; if (!isMain && targetGroup !== sourceGroup) { logger.warn( { sourceGroup, targetGroup }, 'Unauthorized schedule_task attempt blocked', ); break; } // Resolve the correct JID for the target group (don't trust IPC payload) const targetJid = Object.entries(registeredGroups).find( ([, group]) => group.folder === targetGroup, )?.[0]; if (!targetJid) { logger.warn( { targetGroup }, 'Cannot schedule task: target group not registered', ); break; } const scheduleType = data.schedule_type as 'cron' | 'interval' | 'once'; let nextRun: string | null = null; if (scheduleType === 'cron') { try { const interval = CronExpressionParser.parse(data.schedule_value, { tz: TIMEZONE, }); nextRun = interval.next().toISOString(); } catch { logger.warn( { scheduleValue: data.schedule_value }, 'Invalid cron expression', ); break; } } else if (scheduleType === 'interval') { const ms = parseInt(data.schedule_value, 10); if (isNaN(ms) || ms <= 0) { logger.warn( { scheduleValue: data.schedule_value }, 'Invalid interval', ); break; } nextRun = new Date(Date.now() + ms).toISOString(); } else if (scheduleType === 'once') { const scheduled = new Date(data.schedule_value); if (isNaN(scheduled.getTime())) { logger.warn( { scheduleValue: data.schedule_value }, 'Invalid timestamp', ); break; } nextRun = scheduled.toISOString(); } const taskId = `task-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; const contextMode = data.context_mode === 'group' || data.context_mode === 'isolated' ? data.context_mode : 'isolated'; createTask({ id: taskId, group_folder: targetGroup, chat_jid: targetJid, prompt: data.prompt, schedule_type: scheduleType, schedule_value: data.schedule_value, context_mode: contextMode, next_run: nextRun, status: 'active', created_at: new Date().toISOString(), }); logger.info( { taskId, sourceGroup, targetGroup, contextMode }, 'Task created via IPC', ); } break; case 'pause_task': if (data.taskId) { const task = getTask(data.taskId); if (task && (isMain || task.group_folder === sourceGroup)) { updateTask(data.taskId, { status: 'paused' }); logger.info( { taskId: data.taskId, sourceGroup }, 'Task paused via IPC', ); } else { logger.warn( { taskId: data.taskId, sourceGroup }, 'Unauthorized task pause attempt', ); } } break; case 'resume_task': if (data.taskId) { const task = getTask(data.taskId); if (task && (isMain || task.group_folder === sourceGroup)) { updateTask(data.taskId, { status: 'active' }); logger.info( { taskId: data.taskId, sourceGroup }, 'Task resumed via IPC', ); } else { logger.warn( { taskId: data.taskId, sourceGroup }, 'Unauthorized task resume attempt', ); } } break; case 'cancel_task': if (data.taskId) { const task = getTask(data.taskId); if (task && (isMain || task.group_folder === sourceGroup)) { deleteTask(data.taskId); logger.info( { taskId: data.taskId, sourceGroup }, 'Task cancelled via IPC', ); } else { logger.warn( { taskId: data.taskId, sourceGroup }, 'Unauthorized task cancel attempt', ); } } break; case 'refresh_groups': // Only main group can request a refresh if (isMain) { logger.info( { sourceGroup }, 'Group metadata refresh requested via IPC', ); await syncGroupMetadata(true); // Write updated snapshot immediately const availableGroups = getAvailableGroups(); const { writeGroupsSnapshot: writeGroups } = await import('./container-runner.js'); writeGroups( sourceGroup, true, availableGroups, new Set(Object.keys(registeredGroups)), ); } else { logger.warn( { sourceGroup }, 'Unauthorized refresh_groups attempt blocked', ); } break; case 'register_group': // Only main group can register new groups if (!isMain) { logger.warn( { sourceGroup }, 'Unauthorized register_group attempt blocked', ); break; } if (data.jid && data.name && data.folder && data.trigger) { registerGroup(data.jid, { name: data.name, folder: data.folder, trigger: data.trigger, added_at: new Date().toISOString(), containerConfig: data.containerConfig, }); } else { logger.warn( { data }, 'Invalid register_group request - missing required fields', ); } break; default: logger.warn({ type: data.type }, 'Unknown IPC task type'); } } async function connectWhatsApp(): Promise { const authDir = path.join(STORE_DIR, 'auth'); fs.mkdirSync(authDir, { recursive: true }); const { state, saveCreds } = await useMultiFileAuthState(authDir); sock = makeWASocket({ auth: { creds: state.creds, keys: makeCacheableSignalKeyStore(state.keys, logger), }, printQRInTerminal: false, logger, browser: ['NanoClaw', 'Chrome', '1.0.0'], }); sock.ev.on('connection.update', (update) => { const { connection, lastDisconnect, qr } = update; if (qr) { const msg = 'WhatsApp authentication required. Run /setup in Claude Code.'; logger.error(msg); exec( `osascript -e 'display notification "${msg}" with title "NanoClaw" sound name "Basso"'`, ); setTimeout(() => process.exit(1), 1000); } if (connection === 'close') { const reason = (lastDisconnect?.error as any)?.output?.statusCode; const shouldReconnect = reason !== DisconnectReason.loggedOut; logger.info({ reason, shouldReconnect }, 'Connection closed'); if (shouldReconnect) { logger.info('Reconnecting...'); connectWhatsApp(); } else { logger.info('Logged out. Run /setup to re-authenticate.'); process.exit(0); } } else if (connection === 'open') { logger.info('Connected to WhatsApp'); // Sync group metadata on startup (respects 24h cache) syncGroupMetadata().catch((err) => logger.error({ err }, 'Initial group sync failed'), ); // Set up daily sync timer setInterval(() => { syncGroupMetadata().catch((err) => logger.error({ err }, 'Periodic group sync failed'), ); }, GROUP_SYNC_INTERVAL_MS); startSchedulerLoop({ sendMessage, registeredGroups: () => registeredGroups, getSessions: () => sessions, }); startIpcWatcher(); startMessageLoop(); } }); sock.ev.on('creds.update', saveCreds); sock.ev.on('messages.upsert', ({ messages }) => { for (const msg of messages) { if (!msg.message) continue; const chatJid = msg.key.remoteJid; if (!chatJid || chatJid === 'status@broadcast') continue; const timestamp = new Date( Number(msg.messageTimestamp) * 1000, ).toISOString(); // Always store chat metadata for group discovery storeChatMetadata(chatJid, timestamp); // Only store full message content for registered groups if (registeredGroups[chatJid]) { storeMessage( msg, chatJid, msg.key.fromMe || false, msg.pushName || undefined, ); } } }); } async function startMessageLoop(): Promise { logger.info(`NanoClaw running (trigger: @${ASSISTANT_NAME})`); while (true) { try { const jids = Object.keys(registeredGroups); const { messages } = getNewMessages(jids, lastTimestamp, ASSISTANT_NAME); if (messages.length > 0) logger.info({ count: messages.length }, 'New messages'); for (const msg of messages) { try { await processMessage(msg); // Only advance timestamp after successful processing for at-least-once delivery lastTimestamp = msg.timestamp; saveState(); } catch (err) { logger.error( { err, msg: msg.id }, 'Error processing message, will retry', ); // Stop processing this batch - failed message will be retried next loop break; } } } catch (err) { logger.error({ err }, 'Error in message loop'); } await new Promise((resolve) => setTimeout(resolve, POLL_INTERVAL)); } } function ensureContainerSystemRunning(): void { try { execSync('container system status', { stdio: 'pipe' }); logger.debug('Apple Container system already running'); } catch { logger.info('Starting Apple Container system...'); try { execSync('container system start', { stdio: 'pipe', timeout: 30000 }); logger.info('Apple Container system started'); } catch (err) { logger.error({ err }, 'Failed to start Apple Container system'); console.error( '\n╔════════════════════════════════════════════════════════════════╗', ); console.error( '║ FATAL: Apple Container system failed to start ║', ); console.error( '║ ║', ); console.error( '║ Agents cannot run without Apple Container. To fix: ║', ); console.error( '║ 1. Install from: https://github.com/apple/container/releases ║', ); console.error( '║ 2. Run: container system start ║', ); console.error( '║ 3. Restart NanoClaw ║', ); console.error( '╚════════════════════════════════════════════════════════════════╝\n', ); throw new Error('Apple Container system is required but failed to start'); } } } async function main(): Promise { ensureContainerSystemRunning(); initDatabase(); logger.info('Database initialized'); loadState(); await connectWhatsApp(); } main().catch((err) => { logger.error({ err }, 'Failed to start NanoClaw'); process.exit(1); });