Add containerized agent execution with Apple Container
- Agents run in isolated Linux VMs via Apple Container - All groups get Bash access (safe - sandboxed in container) - Browser automation via agent-browser + Chromium - Per-group configurable additional directory mounts - File-based IPC for messages and scheduled tasks - Container image with Node.js 22, Chromium, agent-browser Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
215
src/index.ts
215
src/index.ts
@@ -4,7 +4,6 @@ import makeWASocket, {
|
||||
makeCacheableSignalKeyStore,
|
||||
WASocket
|
||||
} from '@whiskeysockets/baileys';
|
||||
import { query } from '@anthropic-ai/claude-agent-sdk';
|
||||
import pino from 'pino';
|
||||
import { exec } from 'child_process';
|
||||
import fs from 'fs';
|
||||
@@ -18,12 +17,13 @@ import {
|
||||
DATA_DIR,
|
||||
TRIGGER_PATTERN,
|
||||
CLEAR_COMMAND,
|
||||
MAIN_GROUP_FOLDER
|
||||
MAIN_GROUP_FOLDER,
|
||||
IPC_POLL_INTERVAL
|
||||
} from './config.js';
|
||||
import { RegisteredGroup, Session, NewMessage } from './types.js';
|
||||
import { initDatabase, storeMessage, getNewMessages, getMessagesSince } from './db.js';
|
||||
import { createSchedulerMcp } from './scheduler-mcp.js';
|
||||
import { initDatabase, storeMessage, getNewMessages, getMessagesSince, getAllTasks } from './db.js';
|
||||
import { startSchedulerLoop } from './scheduler.js';
|
||||
import { runContainerAgent, writeTasksSnapshot } from './container-runner.js';
|
||||
|
||||
const logger = pino({
|
||||
level: process.env.LOG_LEVEL || 'info',
|
||||
@@ -118,59 +118,46 @@ async function processMessage(msg: NewMessage): Promise<void> {
|
||||
}
|
||||
|
||||
async function runAgent(group: RegisteredGroup, prompt: string, chatJid: string): Promise<string | null> {
|
||||
const groupDir = path.join(GROUPS_DIR, group.folder);
|
||||
fs.mkdirSync(groupDir, { recursive: true });
|
||||
|
||||
const isMain = group.folder === MAIN_GROUP_FOLDER;
|
||||
const sessionId = sessions[group.folder];
|
||||
let newSessionId: string | undefined;
|
||||
let result: string | null = null;
|
||||
|
||||
// Create scheduler MCP with current group context
|
||||
const schedulerMcp = createSchedulerMcp({
|
||||
groupFolder: group.folder,
|
||||
chatJid,
|
||||
isMain,
|
||||
sendMessage
|
||||
});
|
||||
|
||||
// Main channel gets Bash access for admin tasks (querying DB, etc.)
|
||||
const baseTools = ['Read', 'Write', 'Edit', 'Glob', 'Grep', 'WebSearch', 'WebFetch', 'mcp__nanoclaw__*', 'mcp__gmail__*'];
|
||||
const allowedTools = isMain ? [...baseTools, 'Bash'] : baseTools;
|
||||
// Update tasks snapshot for container to read
|
||||
const tasks = getAllTasks();
|
||||
writeTasksSnapshot(tasks.map(t => ({
|
||||
id: t.id,
|
||||
groupFolder: t.group_folder,
|
||||
prompt: t.prompt,
|
||||
schedule_type: t.schedule_type,
|
||||
schedule_value: t.schedule_value,
|
||||
status: t.status,
|
||||
next_run: t.next_run
|
||||
})));
|
||||
|
||||
try {
|
||||
for await (const message of query({
|
||||
const output = await runContainerAgent(group, {
|
||||
prompt,
|
||||
options: {
|
||||
cwd: groupDir,
|
||||
resume: sessionId,
|
||||
allowedTools,
|
||||
permissionMode: 'bypassPermissions',
|
||||
settingSources: ['project'],
|
||||
mcpServers: {
|
||||
nanoclaw: schedulerMcp,
|
||||
gmail: { command: 'npx', args: ['-y', '@gongrzhe/server-gmail-autoauth-mcp'] }
|
||||
}
|
||||
}
|
||||
})) {
|
||||
if (message.type === 'system' && message.subtype === 'init') {
|
||||
newSessionId = message.session_id;
|
||||
}
|
||||
if ('result' in message && message.result) {
|
||||
result = message.result as string;
|
||||
}
|
||||
sessionId,
|
||||
groupFolder: group.folder,
|
||||
chatJid,
|
||||
isMain
|
||||
});
|
||||
|
||||
// Update session if changed
|
||||
if (output.newSessionId) {
|
||||
sessions[group.folder] = output.newSessionId;
|
||||
saveJson(path.join(DATA_DIR, 'sessions.json'), sessions);
|
||||
}
|
||||
|
||||
if (output.status === 'error') {
|
||||
logger.error({ group: group.name, error: output.error }, 'Container agent error');
|
||||
return null;
|
||||
}
|
||||
|
||||
return output.result;
|
||||
} catch (err) {
|
||||
logger.error({ group: group.name, err }, 'Agent error');
|
||||
return null;
|
||||
}
|
||||
|
||||
if (newSessionId) {
|
||||
sessions[group.folder] = newSessionId;
|
||||
saveJson(path.join(DATA_DIR, 'sessions.json'), sessions);
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
async function sendMessage(jid: string, text: string): Promise<void> {
|
||||
@@ -182,6 +169,139 @@ async function sendMessage(jid: string, text: string): Promise<void> {
|
||||
}
|
||||
}
|
||||
|
||||
// IPC watcher for container messages and tasks
|
||||
function startIpcWatcher(): void {
|
||||
const messagesDir = path.join(DATA_DIR, 'ipc', 'messages');
|
||||
const tasksDir = path.join(DATA_DIR, 'ipc', 'tasks');
|
||||
|
||||
fs.mkdirSync(messagesDir, { recursive: true });
|
||||
fs.mkdirSync(tasksDir, { recursive: true });
|
||||
|
||||
const processIpcFiles = async () => {
|
||||
// Process pending messages
|
||||
try {
|
||||
const messageFiles = fs.readdirSync(messagesDir).filter(f => f.endsWith('.json'));
|
||||
for (const file of messageFiles) {
|
||||
const filePath = path.join(messagesDir, file);
|
||||
try {
|
||||
const data = JSON.parse(fs.readFileSync(filePath, 'utf-8'));
|
||||
if (data.type === 'message' && data.chatJid && data.text) {
|
||||
await sendMessage(data.chatJid, `${ASSISTANT_NAME}: ${data.text}`);
|
||||
logger.info({ chatJid: data.chatJid }, 'IPC message sent');
|
||||
}
|
||||
fs.unlinkSync(filePath);
|
||||
} catch (err) {
|
||||
logger.error({ file, err }, 'Error processing IPC message');
|
||||
// Move to error directory instead of deleting
|
||||
const errorDir = path.join(DATA_DIR, 'ipc', 'errors');
|
||||
fs.mkdirSync(errorDir, { recursive: true });
|
||||
fs.renameSync(filePath, path.join(errorDir, file));
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
logger.error({ err }, 'Error reading IPC messages directory');
|
||||
}
|
||||
|
||||
// Process pending task operations
|
||||
try {
|
||||
const taskFiles = fs.readdirSync(tasksDir).filter(f => f.endsWith('.json'));
|
||||
for (const file of taskFiles) {
|
||||
const filePath = path.join(tasksDir, file);
|
||||
try {
|
||||
const data = JSON.parse(fs.readFileSync(filePath, 'utf-8'));
|
||||
await processTaskIpc(data);
|
||||
fs.unlinkSync(filePath);
|
||||
} catch (err) {
|
||||
logger.error({ file, err }, 'Error processing IPC task');
|
||||
const errorDir = path.join(DATA_DIR, 'ipc', 'errors');
|
||||
fs.mkdirSync(errorDir, { recursive: true });
|
||||
fs.renameSync(filePath, path.join(errorDir, file));
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
logger.error({ err }, 'Error reading IPC tasks directory');
|
||||
}
|
||||
|
||||
setTimeout(processIpcFiles, IPC_POLL_INTERVAL);
|
||||
};
|
||||
|
||||
processIpcFiles();
|
||||
logger.info('IPC watcher started');
|
||||
}
|
||||
|
||||
async function processTaskIpc(data: {
|
||||
type: string;
|
||||
taskId?: string;
|
||||
prompt?: string;
|
||||
schedule_type?: string;
|
||||
schedule_value?: string;
|
||||
groupFolder?: string;
|
||||
chatJid?: string;
|
||||
isMain?: boolean;
|
||||
}): Promise<void> {
|
||||
// Import db functions dynamically to avoid circular deps
|
||||
const { createTask, updateTask, deleteTask } = await import('./db.js');
|
||||
const { CronExpressionParser } = await import('cron-parser');
|
||||
|
||||
switch (data.type) {
|
||||
case 'schedule_task':
|
||||
if (data.prompt && data.schedule_type && data.schedule_value && data.groupFolder && data.chatJid) {
|
||||
const scheduleType = data.schedule_type as 'cron' | 'interval' | 'once';
|
||||
|
||||
// Calculate next run time
|
||||
let nextRun: string | null = null;
|
||||
if (scheduleType === 'cron') {
|
||||
const interval = CronExpressionParser.parse(data.schedule_value);
|
||||
nextRun = interval.next().toISOString();
|
||||
} else if (scheduleType === 'interval') {
|
||||
const ms = parseInt(data.schedule_value, 10);
|
||||
nextRun = new Date(Date.now() + ms).toISOString();
|
||||
} else if (scheduleType === 'once') {
|
||||
nextRun = data.schedule_value; // ISO timestamp
|
||||
}
|
||||
|
||||
const taskId = `task-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
|
||||
createTask({
|
||||
id: taskId,
|
||||
group_folder: data.groupFolder,
|
||||
chat_jid: data.chatJid,
|
||||
prompt: data.prompt,
|
||||
schedule_type: scheduleType,
|
||||
schedule_value: data.schedule_value,
|
||||
next_run: nextRun,
|
||||
status: 'active',
|
||||
created_at: new Date().toISOString()
|
||||
});
|
||||
logger.info({ taskId, groupFolder: data.groupFolder }, 'Task created via IPC');
|
||||
}
|
||||
break;
|
||||
|
||||
case 'pause_task':
|
||||
if (data.taskId) {
|
||||
updateTask(data.taskId, { status: 'paused' });
|
||||
logger.info({ taskId: data.taskId }, 'Task paused via IPC');
|
||||
}
|
||||
break;
|
||||
|
||||
case 'resume_task':
|
||||
if (data.taskId) {
|
||||
updateTask(data.taskId, { status: 'active' });
|
||||
logger.info({ taskId: data.taskId }, 'Task resumed via IPC');
|
||||
}
|
||||
break;
|
||||
|
||||
case 'cancel_task':
|
||||
if (data.taskId) {
|
||||
deleteTask(data.taskId);
|
||||
logger.info({ taskId: data.taskId }, 'Task cancelled via IPC');
|
||||
}
|
||||
break;
|
||||
|
||||
default:
|
||||
logger.warn({ type: data.type }, 'Unknown IPC task type');
|
||||
}
|
||||
}
|
||||
|
||||
async function connectWhatsApp(): Promise<void> {
|
||||
const authDir = path.join(STORE_DIR, 'auth');
|
||||
fs.mkdirSync(authDir, { recursive: true });
|
||||
@@ -219,7 +339,8 @@ async function connectWhatsApp(): Promise<void> {
|
||||
}
|
||||
} else if (connection === 'open') {
|
||||
logger.info('Connected to WhatsApp');
|
||||
startSchedulerLoop({ sendMessage });
|
||||
startSchedulerLoop({ sendMessage, registeredGroups: () => registeredGroups });
|
||||
startIpcWatcher();
|
||||
startMessageLoop();
|
||||
}
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user