Add built-in scheduler with group-scoped tasks

- Custom nanoclaw MCP server with scheduling tools (schedule_task,
  list_tasks, get_task, update_task, pause/resume/cancel_task, send_message)
- Tasks run as full agents in their group's context
- Support for cron, interval, and one-time schedules
- Task run logging with duration and results
- Main channel has Bash access for admin tasks (query DB, manage groups)
- Other groups restricted to file operations only
- Updated docs and requirements

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
gavrielc
2026-01-31 21:00:37 +02:00
parent 423d45c52e
commit fa13b14dae
12 changed files with 772 additions and 55 deletions

View File

@@ -1,8 +1,10 @@
export const ASSISTANT_NAME = process.env.ASSISTANT_NAME || 'Andy';
export const POLL_INTERVAL = 2000;
export const SCHEDULER_POLL_INTERVAL = 60000; // Check for due tasks every minute
export const STORE_DIR = './store';
export const GROUPS_DIR = './groups';
export const DATA_DIR = './data';
export const MAIN_GROUP_FOLDER = 'main';
export const TRIGGER_PATTERN = new RegExp(`^@${ASSISTANT_NAME}\\b`, 'i');
export const CLEAR_COMMAND = '/clear';

117
src/db.ts
View File

@@ -2,7 +2,7 @@ import Database from 'better-sqlite3';
import fs from 'fs';
import path from 'path';
import { proto } from '@whiskeysockets/baileys';
import { NewMessage } from './types.js';
import { NewMessage, ScheduledTask, TaskRunLog } from './types.js';
import { STORE_DIR } from './config.js';
let db: Database.Database;
@@ -30,6 +30,34 @@ export function initDatabase(): void {
FOREIGN KEY (chat_jid) REFERENCES chats(jid)
);
CREATE INDEX IF NOT EXISTS idx_timestamp ON messages(timestamp);
CREATE TABLE IF NOT EXISTS scheduled_tasks (
id TEXT PRIMARY KEY,
group_folder TEXT NOT NULL,
chat_jid TEXT NOT NULL,
prompt TEXT NOT NULL,
schedule_type TEXT NOT NULL,
schedule_value TEXT NOT NULL,
next_run TEXT,
last_run TEXT,
last_result TEXT,
status TEXT DEFAULT 'active',
created_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_next_run ON scheduled_tasks(next_run);
CREATE INDEX IF NOT EXISTS idx_status ON scheduled_tasks(status);
CREATE TABLE IF NOT EXISTS task_run_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id TEXT NOT NULL,
run_at TEXT NOT NULL,
duration_ms INTEGER NOT NULL,
status TEXT NOT NULL,
result TEXT,
error TEXT,
FOREIGN KEY (task_id) REFERENCES scheduled_tasks(id)
);
CREATE INDEX IF NOT EXISTS idx_task_run_logs ON task_run_logs(task_id, run_at);
`);
// Add sender_name column if it doesn't exist (migration for existing DBs)
@@ -89,3 +117,90 @@ export function getMessagesSince(chatJid: string, sinceTimestamp: string): NewMe
`;
return db.prepare(sql).all(chatJid, sinceTimestamp) as NewMessage[];
}
// Scheduled Tasks
export function createTask(task: Omit<ScheduledTask, 'last_run' | 'last_result'>): void {
db.prepare(`
INSERT INTO scheduled_tasks (id, group_folder, chat_jid, prompt, schedule_type, schedule_value, next_run, status, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
`).run(
task.id,
task.group_folder,
task.chat_jid,
task.prompt,
task.schedule_type,
task.schedule_value,
task.next_run,
task.status,
task.created_at
);
}
export function getTaskById(id: string): ScheduledTask | undefined {
return db.prepare('SELECT * FROM scheduled_tasks WHERE id = ?').get(id) as ScheduledTask | undefined;
}
export function getTasksForGroup(groupFolder: string): ScheduledTask[] {
return db.prepare('SELECT * FROM scheduled_tasks WHERE group_folder = ? ORDER BY created_at DESC').all(groupFolder) as ScheduledTask[];
}
export function getAllTasks(): ScheduledTask[] {
return db.prepare('SELECT * FROM scheduled_tasks ORDER BY created_at DESC').all() as ScheduledTask[];
}
export function updateTask(id: string, updates: Partial<Pick<ScheduledTask, 'prompt' | 'schedule_type' | 'schedule_value' | 'next_run' | 'status'>>): void {
const fields: string[] = [];
const values: unknown[] = [];
if (updates.prompt !== undefined) { fields.push('prompt = ?'); values.push(updates.prompt); }
if (updates.schedule_type !== undefined) { fields.push('schedule_type = ?'); values.push(updates.schedule_type); }
if (updates.schedule_value !== undefined) { fields.push('schedule_value = ?'); values.push(updates.schedule_value); }
if (updates.next_run !== undefined) { fields.push('next_run = ?'); values.push(updates.next_run); }
if (updates.status !== undefined) { fields.push('status = ?'); values.push(updates.status); }
if (fields.length === 0) return;
values.push(id);
db.prepare(`UPDATE scheduled_tasks SET ${fields.join(', ')} WHERE id = ?`).run(...values);
}
export function deleteTask(id: string): void {
db.prepare('DELETE FROM scheduled_tasks WHERE id = ?').run(id);
db.prepare('DELETE FROM task_run_logs WHERE task_id = ?').run(id);
}
export function getDueTasks(): ScheduledTask[] {
const now = new Date().toISOString();
return db.prepare(`
SELECT * FROM scheduled_tasks
WHERE status = 'active' AND next_run IS NOT NULL AND next_run <= ?
ORDER BY next_run
`).all(now) as ScheduledTask[];
}
export function updateTaskAfterRun(id: string, nextRun: string | null, lastResult: string): void {
const now = new Date().toISOString();
db.prepare(`
UPDATE scheduled_tasks
SET next_run = ?, last_run = ?, last_result = ?, status = CASE WHEN ? IS NULL THEN 'completed' ELSE status END
WHERE id = ?
`).run(nextRun, now, lastResult, nextRun, id);
}
export function logTaskRun(log: TaskRunLog): void {
db.prepare(`
INSERT INTO task_run_logs (task_id, run_at, duration_ms, status, result, error)
VALUES (?, ?, ?, ?, ?, ?)
`).run(log.task_id, log.run_at, log.duration_ms, log.status, log.result, log.error);
}
export function getTaskRunLogs(taskId: string, limit = 10): TaskRunLog[] {
return db.prepare(`
SELECT task_id, run_at, duration_ms, status, result, error
FROM task_run_logs
WHERE task_id = ?
ORDER BY run_at DESC
LIMIT ?
`).all(taskId, limit) as TaskRunLog[];
}

View File

@@ -17,10 +17,13 @@ import {
GROUPS_DIR,
DATA_DIR,
TRIGGER_PATTERN,
CLEAR_COMMAND
CLEAR_COMMAND,
MAIN_GROUP_FOLDER
} from './config.js';
import { RegisteredGroup, Session, NewMessage } from './types.js';
import { initDatabase, storeMessage, getNewMessages, getMessagesSince } from './db.js';
import { createSchedulerMcp } from './scheduler-mcp.js';
import { startSchedulerLoop } from './scheduler.js';
const logger = pino({
level: process.env.LOG_LEVEL || 'info',
@@ -104,7 +107,7 @@ async function processMessage(msg: NewMessage): Promise<void> {
if (!prompt) return;
logger.info({ group: group.name, messageCount: missedMessages.length }, 'Processing message');
const response = await runAgent(group, prompt);
const response = await runAgent(group, prompt, msg.chat_jid);
// Update last agent timestamp
lastAgentTimestamp[msg.chat_jid] = msg.timestamp;
@@ -114,26 +117,39 @@ async function processMessage(msg: NewMessage): Promise<void> {
}
}
async function runAgent(group: RegisteredGroup, prompt: string): Promise<string | null> {
async function runAgent(group: RegisteredGroup, prompt: string, chatJid: string): Promise<string | null> {
const groupDir = path.join(GROUPS_DIR, group.folder);
fs.mkdirSync(groupDir, { recursive: true });
const isMain = group.folder === MAIN_GROUP_FOLDER;
const sessionId = sessions[group.folder];
let newSessionId: string | undefined;
let result: string | null = null;
// Create scheduler MCP with current group context
const schedulerMcp = createSchedulerMcp({
groupFolder: group.folder,
chatJid,
isMain,
sendMessage
});
// Main channel gets Bash access for admin tasks (querying DB, etc.)
const baseTools = ['Read', 'Write', 'Edit', 'Glob', 'Grep', 'WebSearch', 'WebFetch', 'mcp__nanoclaw__*', 'mcp__gmail__*'];
const allowedTools = isMain ? [...baseTools, 'Bash'] : baseTools;
try {
for await (const message of query({
prompt,
options: {
cwd: groupDir,
resume: sessionId,
allowedTools: ['Read', 'Write', 'Edit', 'Glob', 'Grep', 'WebSearch', 'WebFetch'],
allowedTools,
permissionMode: 'bypassPermissions',
settingSources: ['project'],
mcpServers: {
gmail: { command: 'npx', args: ['-y', '@gongrzhe/server-gmail-autoauth-mcp'] },
scheduler: { command: 'npx', args: ['-y', 'schedule-task-mcp'] }
nanoclaw: schedulerMcp,
gmail: { command: 'npx', args: ['-y', '@gongrzhe/server-gmail-autoauth-mcp'] }
}
}
})) {
@@ -203,6 +219,7 @@ async function connectWhatsApp(): Promise<void> {
}
} else if (connection === 'open') {
logger.info('Connected to WhatsApp');
startSchedulerLoop({ sendMessage });
startMessageLoop();
}
});

284
src/scheduler-mcp.ts Normal file
View File

@@ -0,0 +1,284 @@
import { createSdkMcpServer, tool } from '@anthropic-ai/claude-agent-sdk';
import { z } from 'zod';
import { CronExpressionParser } from 'cron-parser';
import {
createTask,
getTaskById,
getTasksForGroup,
getAllTasks,
updateTask,
deleteTask,
getTaskRunLogs
} from './db.js';
import { ScheduledTask } from './types.js';
import { MAIN_GROUP_FOLDER } from './config.js';
function generateTaskId(): string {
return `task-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
}
function calculateNextRun(scheduleType: string, scheduleValue: string): string | null {
const now = new Date();
switch (scheduleType) {
case 'cron': {
const interval = CronExpressionParser.parse(scheduleValue);
return interval.next().toISOString();
}
case 'interval': {
const ms = parseInt(scheduleValue, 10);
return new Date(now.getTime() + ms).toISOString();
}
case 'once': {
const runAt = new Date(scheduleValue);
return runAt > now ? runAt.toISOString() : null;
}
default:
return null;
}
}
function formatTask(task: ScheduledTask): string {
const lines = [
`ID: ${task.id}`,
`Group: ${task.group_folder}`,
`Prompt: ${task.prompt}`,
`Schedule: ${task.schedule_type} (${task.schedule_value})`,
`Status: ${task.status}`,
`Next run: ${task.next_run || 'N/A'}`,
`Last run: ${task.last_run || 'Never'}`,
`Last result: ${task.last_result || 'N/A'}`
];
return lines.join('\n');
}
export interface SchedulerMcpContext {
groupFolder: string;
chatJid: string;
isMain: boolean;
sendMessage: (jid: string, text: string) => Promise<void>;
}
export function createSchedulerMcp(ctx: SchedulerMcpContext) {
const { groupFolder, chatJid, isMain, sendMessage } = ctx;
return createSdkMcpServer({
name: 'nanoclaw',
version: '1.0.0',
tools: [
tool(
'schedule_task',
'Schedule a recurring or one-time task. The task will run as an agent in the current group context.',
{
prompt: z.string().describe('The prompt/instruction for the task when it runs'),
schedule_type: z.enum(['cron', 'interval', 'once']).describe('Type of schedule: cron (e.g., "0 9 * * 1" for Mondays at 9am), interval (milliseconds), or once (ISO timestamp)'),
schedule_value: z.string().describe('Schedule value: cron expression, milliseconds for interval, or ISO timestamp for once'),
target_group: z.string().optional().describe('(Main channel only) Target group folder to run the task in. Defaults to current group.')
},
async (args) => {
const targetGroup = isMain && args.target_group ? args.target_group : groupFolder;
const targetJid = isMain && args.target_group ? '' : chatJid; // Will need to look up JID for other groups
// Validate schedule
const nextRun = calculateNextRun(args.schedule_type, args.schedule_value);
if (nextRun === null && args.schedule_type !== 'once') {
return { content: [{ type: 'text', text: 'Error: Invalid schedule. Task would never run.' }] };
}
const task: Omit<ScheduledTask, 'last_run' | 'last_result'> = {
id: generateTaskId(),
group_folder: targetGroup,
chat_jid: targetJid || chatJid,
prompt: args.prompt,
schedule_type: args.schedule_type,
schedule_value: args.schedule_value,
next_run: nextRun,
status: 'active',
created_at: new Date().toISOString()
};
createTask(task);
return {
content: [{
type: 'text',
text: `Task scheduled successfully!\n\n${formatTask(task as ScheduledTask)}`
}]
};
}
),
tool(
'list_tasks',
'List scheduled tasks. Shows tasks for the current group, or all tasks if called from the main channel.',
{},
async () => {
const tasks = isMain ? getAllTasks() : getTasksForGroup(groupFolder);
if (tasks.length === 0) {
return { content: [{ type: 'text', text: 'No scheduled tasks found.' }] };
}
const formatted = tasks.map((t, i) => `--- Task ${i + 1} ---\n${formatTask(t)}`).join('\n\n');
return { content: [{ type: 'text', text: `Found ${tasks.length} task(s):\n\n${formatted}` }] };
}
),
tool(
'get_task',
'Get details about a specific task including run history.',
{
task_id: z.string().describe('The task ID')
},
async (args) => {
const task = getTaskById(args.task_id);
if (!task) {
return { content: [{ type: 'text', text: `Task not found: ${args.task_id}` }] };
}
// Check permissions
if (!isMain && task.group_folder !== groupFolder) {
return { content: [{ type: 'text', text: 'Access denied: Task belongs to another group.' }] };
}
const logs = getTaskRunLogs(args.task_id, 5);
let output = formatTask(task);
if (logs.length > 0) {
output += '\n\n--- Recent Runs ---\n';
output += logs.map(l =>
`${l.run_at}: ${l.status} (${l.duration_ms}ms)${l.error ? ` - ${l.error}` : ''}`
).join('\n');
}
return { content: [{ type: 'text', text: output }] };
}
),
tool(
'update_task',
'Update a scheduled task.',
{
task_id: z.string().describe('The task ID'),
prompt: z.string().optional().describe('New prompt for the task'),
schedule_type: z.enum(['cron', 'interval', 'once']).optional().describe('New schedule type'),
schedule_value: z.string().optional().describe('New schedule value')
},
async (args) => {
const task = getTaskById(args.task_id);
if (!task) {
return { content: [{ type: 'text', text: `Task not found: ${args.task_id}` }] };
}
if (!isMain && task.group_folder !== groupFolder) {
return { content: [{ type: 'text', text: 'Access denied: Task belongs to another group.' }] };
}
const updates: Parameters<typeof updateTask>[1] = {};
if (args.prompt) updates.prompt = args.prompt;
if (args.schedule_type) updates.schedule_type = args.schedule_type;
if (args.schedule_value) updates.schedule_value = args.schedule_value;
// Recalculate next_run if schedule changed
if (args.schedule_type || args.schedule_value) {
const schedType = args.schedule_type || task.schedule_type;
const schedValue = args.schedule_value || task.schedule_value;
updates.next_run = calculateNextRun(schedType, schedValue);
}
updateTask(args.task_id, updates);
const updated = getTaskById(args.task_id)!;
return { content: [{ type: 'text', text: `Task updated!\n\n${formatTask(updated)}` }] };
}
),
tool(
'pause_task',
'Pause a scheduled task.',
{
task_id: z.string().describe('The task ID')
},
async (args) => {
const task = getTaskById(args.task_id);
if (!task) {
return { content: [{ type: 'text', text: `Task not found: ${args.task_id}` }] };
}
if (!isMain && task.group_folder !== groupFolder) {
return { content: [{ type: 'text', text: 'Access denied: Task belongs to another group.' }] };
}
updateTask(args.task_id, { status: 'paused' });
return { content: [{ type: 'text', text: `Task ${args.task_id} paused.` }] };
}
),
tool(
'resume_task',
'Resume a paused task.',
{
task_id: z.string().describe('The task ID')
},
async (args) => {
const task = getTaskById(args.task_id);
if (!task) {
return { content: [{ type: 'text', text: `Task not found: ${args.task_id}` }] };
}
if (!isMain && task.group_folder !== groupFolder) {
return { content: [{ type: 'text', text: 'Access denied: Task belongs to another group.' }] };
}
// Recalculate next_run when resuming
const nextRun = calculateNextRun(task.schedule_type, task.schedule_value);
updateTask(args.task_id, { status: 'active', next_run: nextRun });
return { content: [{ type: 'text', text: `Task ${args.task_id} resumed. Next run: ${nextRun}` }] };
}
),
tool(
'cancel_task',
'Cancel and delete a scheduled task.',
{
task_id: z.string().describe('The task ID')
},
async (args) => {
const task = getTaskById(args.task_id);
if (!task) {
return { content: [{ type: 'text', text: `Task not found: ${args.task_id}` }] };
}
if (!isMain && task.group_folder !== groupFolder) {
return { content: [{ type: 'text', text: 'Access denied: Task belongs to another group.' }] };
}
deleteTask(args.task_id);
return { content: [{ type: 'text', text: `Task ${args.task_id} cancelled and deleted.` }] };
}
),
tool(
'send_message',
'Send a message to the WhatsApp group. Use this to notify the group about task results or updates.',
{
text: z.string().describe('The message text to send'),
target_jid: z.string().optional().describe('(Main channel only) Target group JID. Defaults to current group.')
},
async (args) => {
const targetJid = isMain && args.target_jid ? args.target_jid : chatJid;
try {
await sendMessage(targetJid, args.text);
return { content: [{ type: 'text', text: 'Message sent successfully.' }] };
} catch (error) {
return { content: [{ type: 'text', text: `Failed to send message: ${error}` }] };
}
}
)
]
});
}
export { calculateNextRun };

118
src/scheduler.ts Normal file
View File

@@ -0,0 +1,118 @@
import { query } from '@anthropic-ai/claude-agent-sdk';
import fs from 'fs';
import path from 'path';
import pino from 'pino';
import { CronExpressionParser } from 'cron-parser';
import { getDueTasks, updateTaskAfterRun, logTaskRun, getTaskById } from './db.js';
import { createSchedulerMcp } from './scheduler-mcp.js';
import { ScheduledTask } from './types.js';
import { GROUPS_DIR, SCHEDULER_POLL_INTERVAL } from './config.js';
const logger = pino({
level: process.env.LOG_LEVEL || 'info',
transport: { target: 'pino-pretty', options: { colorize: true } }
});
export interface SchedulerDependencies {
sendMessage: (jid: string, text: string) => Promise<void>;
}
async function runTask(task: ScheduledTask, deps: SchedulerDependencies): Promise<void> {
const startTime = Date.now();
const groupDir = path.join(GROUPS_DIR, task.group_folder);
fs.mkdirSync(groupDir, { recursive: true });
logger.info({ taskId: task.id, group: task.group_folder }, 'Running scheduled task');
// Create the scheduler MCP with task's group context
const schedulerMcp = createSchedulerMcp({
groupFolder: task.group_folder,
chatJid: task.chat_jid,
isMain: false, // Scheduled tasks run in their group's context, not as main
sendMessage: deps.sendMessage
});
let result: string | null = null;
let error: string | null = null;
try {
for await (const message of query({
prompt: task.prompt,
options: {
cwd: groupDir,
allowedTools: ['Read', 'Write', 'Edit', 'Glob', 'Grep', 'WebSearch', 'WebFetch', 'mcp__nanoclaw__*', 'mcp__gmail__*'],
permissionMode: 'bypassPermissions',
settingSources: ['project'],
mcpServers: {
nanoclaw: schedulerMcp,
gmail: { command: 'npx', args: ['-y', '@gongrzhe/server-gmail-autoauth-mcp'] }
}
}
})) {
if ('result' in message && message.result) {
result = message.result as string;
}
}
logger.info({ taskId: task.id, durationMs: Date.now() - startTime }, 'Task completed successfully');
} catch (err) {
error = err instanceof Error ? err.message : String(err);
logger.error({ taskId: task.id, error }, 'Task failed');
}
const durationMs = Date.now() - startTime;
// Log the run
logTaskRun({
task_id: task.id,
run_at: new Date().toISOString(),
duration_ms: durationMs,
status: error ? 'error' : 'success',
result,
error
});
// Calculate next run
let nextRun: string | null = null;
if (task.schedule_type === 'cron') {
const interval = CronExpressionParser.parse(task.schedule_value);
nextRun = interval.next().toISOString();
} else if (task.schedule_type === 'interval') {
const ms = parseInt(task.schedule_value, 10);
nextRun = new Date(Date.now() + ms).toISOString();
}
// 'once' tasks don't have a next run
// Update task
const resultSummary = error ? `Error: ${error}` : (result ? result.slice(0, 200) : 'Completed');
updateTaskAfterRun(task.id, nextRun, resultSummary);
}
export function startSchedulerLoop(deps: SchedulerDependencies): void {
logger.info('Scheduler loop started');
const loop = async () => {
try {
const dueTasks = getDueTasks();
if (dueTasks.length > 0) {
logger.info({ count: dueTasks.length }, 'Found due tasks');
}
for (const task of dueTasks) {
// Re-check task status in case it was paused/cancelled
const currentTask = getTaskById(task.id);
if (!currentTask || currentTask.status !== 'active') {
continue;
}
await runTask(currentTask, deps);
}
} catch (err) {
logger.error({ err }, 'Error in scheduler loop');
}
setTimeout(loop, SCHEDULER_POLL_INTERVAL);
};
loop();
}

View File

@@ -17,3 +17,26 @@ export interface NewMessage {
content: string;
timestamp: string;
}
export interface ScheduledTask {
id: string;
group_folder: string;
chat_jid: string;
prompt: string;
schedule_type: 'cron' | 'interval' | 'once';
schedule_value: string;
next_run: string | null;
last_run: string | null;
last_result: string | null;
status: 'active' | 'paused' | 'completed';
created_at: string;
}
export interface TaskRunLog {
task_id: string;
run_at: string;
duration_ms: number;
status: 'success' | 'error';
result: string | null;
error: string | null;
}