From fa13b14dae7c127cd62fa4d73c04854a28a1b753 Mon Sep 17 00:00:00 2001 From: gavrielc Date: Sat, 31 Jan 2026 21:00:37 +0200 Subject: [PATCH] Add built-in scheduler with group-scoped tasks - Custom nanoclaw MCP server with scheduling tools (schedule_task, list_tasks, get_task, update_task, pause/resume/cancel_task, send_message) - Tasks run as full agents in their group's context - Support for cron, interval, and one-time schedules - Task run logging with duration and results - Main channel has Bash access for admin tasks (query DB, manage groups) - Other groups restricted to file operations only - Updated docs and requirements Co-Authored-By: Claude Opus 4.5 --- .gitignore | 2 + REQUIREMENTS.md | 32 +++-- SPEC.md | 94 ++++++++++---- groups/main/CLAUDE.md | 96 ++++++++++++-- package-lock.json | 26 +++- package.json | 4 +- src/config.ts | 2 + src/db.ts | 117 ++++++++++++++++- src/index.ts | 29 ++++- src/scheduler-mcp.ts | 284 ++++++++++++++++++++++++++++++++++++++++++ src/scheduler.ts | 118 ++++++++++++++++++ src/types.ts | 23 ++++ 12 files changed, 772 insertions(+), 55 deletions(-) create mode 100644 src/scheduler-mcp.ts create mode 100644 src/scheduler.ts diff --git a/.gitignore b/.gitignore index a80544d..e54e7d1 100644 --- a/.gitignore +++ b/.gitignore @@ -19,3 +19,5 @@ logs/ # IDE .idea/ .vscode/ + +agents-sdk-docs diff --git a/REQUIREMENTS.md b/REQUIREMENTS.md index 83409fc..4247c9b 100644 --- a/REQUIREMENTS.md +++ b/REQUIREMENTS.md @@ -42,14 +42,28 @@ A personal Claude assistant accessible via WhatsApp, with minimal custom code. - Old session IDs are archived to a file ### Scheduled Tasks -- Users can ask Claude to schedule cron jobs from any group -- Tasks run in the context of the group that created them (with that group's memory) -- Task output is logged to the group's folder +- Users can ask Claude to schedule recurring or one-time tasks from any group +- Tasks run as full agents in the context of the group that created them +- Tasks have access to the same tools as regular messages (except Bash) +- Tasks can optionally send messages to their group via `send_message` tool, or complete silently +- Task runs are logged to the database with duration and result +- Schedule types: cron expressions, intervals (ms), or one-time (ISO timestamp) +- From main: can schedule tasks for any group, view/manage all tasks +- From other groups: can only manage that group's tasks ### Group Management - New groups are added explicitly via the main channel -- Groups are identified by human-readable name when possible -- Each group gets a dedicated folder +- Main channel agent has Bash access to query the database and find group JIDs +- Groups are registered by editing `data/registered_groups.json` +- Each group gets a dedicated folder under `groups/` + +### Main Channel Privileges +- Main channel is the admin/control group (typically self-chat) +- Has Bash access for system commands and database queries +- Can write to global memory (`groups/CLAUDE.md`) +- Can schedule tasks for any group +- Can view and manage tasks from all groups +- Other groups do NOT have Bash access (security measure) --- @@ -65,8 +79,12 @@ A personal Claude assistant accessible via WhatsApp, with minimal custom code. - Optional, enabled during setup ### Scheduler -- MCP server for creating/managing scheduled tasks -- Tasks execute Claude Agent SDK in group context +- Built-in scheduler (not external MCP) - runs in-process +- Custom `nanoclaw` MCP server provides scheduling tools +- Tools: `schedule_task`, `list_tasks`, `get_task`, `update_task`, `pause_task`, `resume_task`, `cancel_task`, `send_message` +- Tasks stored in SQLite with run history +- Scheduler loop checks for due tasks every minute +- Tasks execute Claude Agent SDK in group context with full tool access ### Web Access - Built-in WebSearch and WebFetch tools diff --git a/SPEC.md b/SPEC.md index f502972..71d3a53 100644 --- a/SPEC.md +++ b/SPEC.md @@ -92,7 +92,9 @@ nanoclaw/ │ ├── config.ts # Configuration constants │ ├── types.ts # TypeScript interfaces │ ├── db.ts # Database initialization and queries -│ └── auth.ts # Standalone WhatsApp authentication +│ ├── auth.ts # Standalone WhatsApp authentication +│ ├── scheduler.ts # Scheduler loop (runs due tasks) +│ └── scheduler-mcp.ts # In-process MCP server for scheduling tools │ ├── dist/ # Compiled JavaScript (gitignored) │ @@ -115,7 +117,7 @@ nanoclaw/ │ ├── store/ # Local data (gitignored) │ ├── auth/ # WhatsApp authentication state -│ └── messages.db # SQLite message database +│ └── messages.db # SQLite database (messages, scheduled_tasks, task_run_logs) │ ├── data/ # Application state (gitignored) │ ├── sessions.json # Active session IDs per group @@ -196,7 +198,9 @@ NanoClaw uses a hierarchical memory system based on CLAUDE.md files. 3. **Main Channel Privileges** - Only the "main" group (self-chat) can write to global memory - - This prevents other groups from modifying shared context + - Main has **Bash access** for admin tasks (querying DB, system commands) + - Main can manage registered groups and schedule tasks for any group + - Other groups do NOT have Bash access (security measure) --- @@ -321,35 +325,82 @@ This allows the agent to understand the conversation context even if it wasn't m ## Scheduled Tasks -NanoClaw can schedule recurring tasks that run at specified times via the scheduler MCP. +NanoClaw has a built-in scheduler that runs tasks as full agents in their group's context. + +### How Scheduling Works + +1. **Group Context**: Tasks created in a group run with that group's working directory and memory +2. **Full Agent Capabilities**: Scheduled tasks have access to all tools (WebSearch, Gmail, file operations, etc.) +3. **Optional Messaging**: Tasks can send messages to their group using the `send_message` tool, or complete silently +4. **Main Channel Privileges**: The main channel can schedule tasks for any group and view all tasks + +### Schedule Types + +| Type | Value Format | Example | +|------|--------------|---------| +| `cron` | Cron expression | `0 9 * * 1` (Mondays at 9am) | +| `interval` | Milliseconds | `3600000` (every hour) | +| `once` | ISO timestamp | `2024-12-25T09:00:00Z` | ### Creating a Task ``` User: @Andy remind me every Monday at 9am to review the weekly metrics -Claude: [calls mcp__scheduler__create_task] +Claude: [calls mcp__nanoclaw__schedule_task] { - "instruction": "Remind user to review weekly metrics", - "trigger_type": "cron", - "cron_expression": "0 9 * * 1" + "prompt": "Send a reminder to review weekly metrics. Be encouraging!", + "schedule_type": "cron", + "schedule_value": "0 9 * * 1" } Claude: Done! I'll remind you every Monday at 9am. ``` +### One-Time Tasks + +``` +User: @Andy at 5pm today, send me a summary of today's emails + +Claude: [calls mcp__nanoclaw__schedule_task] + { + "prompt": "Search for today's emails, summarize the important ones, and send the summary to the group.", + "schedule_type": "once", + "schedule_value": "2024-01-31T17:00:00Z" + } +``` + +### Managing Tasks + +From any group: +- `@Andy list my scheduled tasks` - View tasks for this group +- `@Andy pause task [id]` - Pause a task +- `@Andy resume task [id]` - Resume a paused task +- `@Andy cancel task [id]` - Delete a task + +From main channel: +- `@Andy list all tasks` - View tasks from all groups +- `@Andy schedule task for "Family Chat": [prompt]` - Schedule for another group + --- ## MCP Servers -MCP servers are configured in the Claude Agent SDK options: +### NanoClaw MCP (built-in) -```typescript -mcpServers: { - gmail: { command: 'npx', args: ['-y', '@gongrzhe/server-gmail-autoauth-mcp'] }, - scheduler: { command: 'npx', args: ['-y', 'schedule-task-mcp'] } -} -``` +The `nanoclaw` MCP server is created dynamically per agent call with the current group's context. + +**Available Tools:** +| Tool | Purpose | +|------|---------| +| `schedule_task` | Schedule a recurring or one-time task | +| `list_tasks` | Show tasks (group's tasks, or all if main) | +| `get_task` | Get task details and run history | +| `update_task` | Modify task prompt or schedule | +| `pause_task` | Pause a task | +| `resume_task` | Resume a paused task | +| `cancel_task` | Delete a task | +| `send_message` | Send a WhatsApp message to the group | ### Gmail MCP (@gongrzhe/server-gmail-autoauth-mcp) @@ -363,18 +414,6 @@ Provides email capabilities. Requires Google Cloud OAuth setup. | `send_message` | Send email | | `reply_message` | Reply to thread | -### Scheduler MCP (schedule-task-mcp) - -Provides cron-style task scheduling. - -**Available Tools:** -| Tool | Purpose | -|------|---------| -| `create_task` | Schedule a new task | -| `list_tasks` | Show scheduled tasks | -| `delete_task` | Cancel a task | -| `update_task` | Modify schedule | - --- ## Deployment @@ -450,6 +489,7 @@ WhatsApp messages could contain malicious instructions attempting to manipulate - Only registered groups are processed - Trigger word required (reduces accidental processing) - Main channel has elevated privileges (isolated from other groups) +- Regular groups do NOT have Bash access (only main does) - Claude's built-in safety training **Recommendations:** diff --git a/groups/main/CLAUDE.md b/groups/main/CLAUDE.md index 08deb4b..8ae2fd5 100644 --- a/groups/main/CLAUDE.md +++ b/groups/main/CLAUDE.md @@ -1,18 +1,92 @@ -# Main Channel +# Main Channel - Admin Context -Self-chat - the primary control channel for NanoClaw. +You are running in the **main channel**, which has elevated privileges. You can: +- Manage registered groups (add, remove, list) +- Schedule tasks for any group +- View tasks from all groups +- Access all group folders +- **Run Bash commands** (only main has this access) -## Permissions +--- -This channel can: -- Write to global memory (../CLAUDE.md) -- Add/remove groups -- Manage scheduled tasks across all groups +## Managing Groups -## Memory +### Finding Available Groups - +Groups appear in the database when messages are received. Query the SQLite database to find groups: -## Files +```sql +-- Find all group chats (JIDs ending in @g.us) +SELECT DISTINCT chat_jid, name FROM chats WHERE chat_jid LIKE '%@g.us'; - +-- Or find chats with recent messages +SELECT chat_jid, MAX(timestamp) as last_message +FROM messages +WHERE chat_jid LIKE '%@g.us' +GROUP BY chat_jid +ORDER BY last_message DESC; +``` + +Database location: `store/messages.db` + +### Registered Groups Config + +Groups are registered in `data/registered_groups.json`: + +```json +{ + "1234567890-1234567890@g.us": { + "name": "Family Chat", + "folder": "family-chat", + "trigger": "@Andy", + "added_at": "2024-01-31T12:00:00.000Z" + } +} +``` + +Fields: +- **Key**: The WhatsApp JID (unique identifier for the chat) +- **name**: Display name for the group +- **folder**: Folder name under `groups/` for this group's files and memory +- **trigger**: The trigger word (usually same as global, but could differ) +- **added_at**: ISO timestamp when registered + +### Adding a Group + +1. Query the database to find the group's JID +2. Read the current `data/registered_groups.json` +3. Add the new group entry +4. Write the updated JSON back +5. Create the group folder: `groups/{folder-name}/` +6. Optionally create an initial `CLAUDE.md` for the group + +Example folder name conventions: +- "Family Chat" → `family-chat` +- "Work Team" → `work-team` +- Use lowercase, hyphens instead of spaces + +### Removing a Group + +1. Read `data/registered_groups.json` +2. Remove the entry for that group +3. Write the updated JSON back +4. The group folder and its files remain (don't delete them) + +### Listing Groups + +Read `data/registered_groups.json` and format it nicely for the user. + +--- + +## Global Memory + +You can read and write to `groups/CLAUDE.md` (the parent directory) for facts that should apply to all groups. Only update global memory when explicitly asked to "remember this globally" or similar. + +--- + +## Scheduling for Other Groups + +When scheduling tasks for other groups, use the `target_group` parameter: +- `schedule_task(prompt: "...", schedule_type: "cron", schedule_value: "0 9 * * 1", target_group: "family-chat")` + +The task will run in that group's context with access to their files and memory. diff --git a/package-lock.json b/package-lock.json index c46f756..ec5e60f 100644 --- a/package-lock.json +++ b/package-lock.json @@ -11,9 +11,11 @@ "@anthropic-ai/claude-agent-sdk": "^0.1.0", "@whiskeysockets/baileys": "^7.0.0-rc.9", "better-sqlite3": "^11.8.1", + "cron-parser": "^5.5.0", "pino": "^9.6.0", "pino-pretty": "^13.0.0", - "qrcode-terminal": "^0.12.0" + "qrcode-terminal": "^0.12.0", + "zod": "^4.3.6" }, "devDependencies": { "@types/better-sqlite3": "^7.6.12", @@ -1350,6 +1352,18 @@ "node": ">= 0.6" } }, + "node_modules/cron-parser": { + "version": "5.5.0", + "resolved": "https://registry.npmjs.org/cron-parser/-/cron-parser-5.5.0.tgz", + "integrity": "sha512-oML4lKUXxizYswqmxuOCpgFS8BNUJpIu6k/2HVHyaL8Ynnf3wdf9tkns0yRdJLSIjkJ+b0DXHMZEHGpMwjnPww==", + "license": "MIT", + "dependencies": { + "luxon": "^3.7.1" + }, + "engines": { + "node": ">=18" + } + }, "node_modules/curve25519-js": { "version": "0.0.4", "resolved": "https://registry.npmjs.org/curve25519-js/-/curve25519-js-0.0.4.tgz", @@ -1694,6 +1708,15 @@ "node": "20 || >=22" } }, + "node_modules/luxon": { + "version": "3.7.2", + "resolved": "https://registry.npmjs.org/luxon/-/luxon-3.7.2.tgz", + "integrity": "sha512-vtEhXh/gNjI9Yg1u4jX/0YVPMvxzHuGgCm6tC5kZyb08yjGWGnqAjGJvcXbqQR2P3MyMEFnRbpcdFS6PBcLqew==", + "license": "MIT", + "engines": { + "node": ">=12" + } + }, "node_modules/media-typer": { "version": "1.1.0", "resolved": "https://registry.npmjs.org/media-typer/-/media-typer-1.1.0.tgz", @@ -2731,7 +2754,6 @@ "resolved": "https://registry.npmjs.org/zod/-/zod-4.3.6.tgz", "integrity": "sha512-rftlrkhHZOcjDwkGlnUtZZkvaPHCsDATp4pGpuOOMDaTdDDXF91wuVDJoWoPsKX/3YPQ5fHuF3STjcYyKr+Qhg==", "license": "MIT", - "peer": true, "funding": { "url": "https://github.com/sponsors/colinhacks" } diff --git a/package.json b/package.json index 255a3e5..c65c5be 100644 --- a/package.json +++ b/package.json @@ -16,9 +16,11 @@ "@anthropic-ai/claude-agent-sdk": "^0.1.0", "@whiskeysockets/baileys": "^7.0.0-rc.9", "better-sqlite3": "^11.8.1", + "cron-parser": "^5.5.0", "pino": "^9.6.0", "pino-pretty": "^13.0.0", - "qrcode-terminal": "^0.12.0" + "qrcode-terminal": "^0.12.0", + "zod": "^4.3.6" }, "devDependencies": { "@types/better-sqlite3": "^7.6.12", diff --git a/src/config.ts b/src/config.ts index 49dc384..c4afc3d 100644 --- a/src/config.ts +++ b/src/config.ts @@ -1,8 +1,10 @@ export const ASSISTANT_NAME = process.env.ASSISTANT_NAME || 'Andy'; export const POLL_INTERVAL = 2000; +export const SCHEDULER_POLL_INTERVAL = 60000; // Check for due tasks every minute export const STORE_DIR = './store'; export const GROUPS_DIR = './groups'; export const DATA_DIR = './data'; +export const MAIN_GROUP_FOLDER = 'main'; export const TRIGGER_PATTERN = new RegExp(`^@${ASSISTANT_NAME}\\b`, 'i'); export const CLEAR_COMMAND = '/clear'; diff --git a/src/db.ts b/src/db.ts index 2056575..294e534 100644 --- a/src/db.ts +++ b/src/db.ts @@ -2,7 +2,7 @@ import Database from 'better-sqlite3'; import fs from 'fs'; import path from 'path'; import { proto } from '@whiskeysockets/baileys'; -import { NewMessage } from './types.js'; +import { NewMessage, ScheduledTask, TaskRunLog } from './types.js'; import { STORE_DIR } from './config.js'; let db: Database.Database; @@ -30,6 +30,34 @@ export function initDatabase(): void { FOREIGN KEY (chat_jid) REFERENCES chats(jid) ); CREATE INDEX IF NOT EXISTS idx_timestamp ON messages(timestamp); + + CREATE TABLE IF NOT EXISTS scheduled_tasks ( + id TEXT PRIMARY KEY, + group_folder TEXT NOT NULL, + chat_jid TEXT NOT NULL, + prompt TEXT NOT NULL, + schedule_type TEXT NOT NULL, + schedule_value TEXT NOT NULL, + next_run TEXT, + last_run TEXT, + last_result TEXT, + status TEXT DEFAULT 'active', + created_at TEXT NOT NULL + ); + CREATE INDEX IF NOT EXISTS idx_next_run ON scheduled_tasks(next_run); + CREATE INDEX IF NOT EXISTS idx_status ON scheduled_tasks(status); + + CREATE TABLE IF NOT EXISTS task_run_logs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + task_id TEXT NOT NULL, + run_at TEXT NOT NULL, + duration_ms INTEGER NOT NULL, + status TEXT NOT NULL, + result TEXT, + error TEXT, + FOREIGN KEY (task_id) REFERENCES scheduled_tasks(id) + ); + CREATE INDEX IF NOT EXISTS idx_task_run_logs ON task_run_logs(task_id, run_at); `); // Add sender_name column if it doesn't exist (migration for existing DBs) @@ -89,3 +117,90 @@ export function getMessagesSince(chatJid: string, sinceTimestamp: string): NewMe `; return db.prepare(sql).all(chatJid, sinceTimestamp) as NewMessage[]; } + +// Scheduled Tasks + +export function createTask(task: Omit): void { + db.prepare(` + INSERT INTO scheduled_tasks (id, group_folder, chat_jid, prompt, schedule_type, schedule_value, next_run, status, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + `).run( + task.id, + task.group_folder, + task.chat_jid, + task.prompt, + task.schedule_type, + task.schedule_value, + task.next_run, + task.status, + task.created_at + ); +} + +export function getTaskById(id: string): ScheduledTask | undefined { + return db.prepare('SELECT * FROM scheduled_tasks WHERE id = ?').get(id) as ScheduledTask | undefined; +} + +export function getTasksForGroup(groupFolder: string): ScheduledTask[] { + return db.prepare('SELECT * FROM scheduled_tasks WHERE group_folder = ? ORDER BY created_at DESC').all(groupFolder) as ScheduledTask[]; +} + +export function getAllTasks(): ScheduledTask[] { + return db.prepare('SELECT * FROM scheduled_tasks ORDER BY created_at DESC').all() as ScheduledTask[]; +} + +export function updateTask(id: string, updates: Partial>): void { + const fields: string[] = []; + const values: unknown[] = []; + + if (updates.prompt !== undefined) { fields.push('prompt = ?'); values.push(updates.prompt); } + if (updates.schedule_type !== undefined) { fields.push('schedule_type = ?'); values.push(updates.schedule_type); } + if (updates.schedule_value !== undefined) { fields.push('schedule_value = ?'); values.push(updates.schedule_value); } + if (updates.next_run !== undefined) { fields.push('next_run = ?'); values.push(updates.next_run); } + if (updates.status !== undefined) { fields.push('status = ?'); values.push(updates.status); } + + if (fields.length === 0) return; + + values.push(id); + db.prepare(`UPDATE scheduled_tasks SET ${fields.join(', ')} WHERE id = ?`).run(...values); +} + +export function deleteTask(id: string): void { + db.prepare('DELETE FROM scheduled_tasks WHERE id = ?').run(id); + db.prepare('DELETE FROM task_run_logs WHERE task_id = ?').run(id); +} + +export function getDueTasks(): ScheduledTask[] { + const now = new Date().toISOString(); + return db.prepare(` + SELECT * FROM scheduled_tasks + WHERE status = 'active' AND next_run IS NOT NULL AND next_run <= ? + ORDER BY next_run + `).all(now) as ScheduledTask[]; +} + +export function updateTaskAfterRun(id: string, nextRun: string | null, lastResult: string): void { + const now = new Date().toISOString(); + db.prepare(` + UPDATE scheduled_tasks + SET next_run = ?, last_run = ?, last_result = ?, status = CASE WHEN ? IS NULL THEN 'completed' ELSE status END + WHERE id = ? + `).run(nextRun, now, lastResult, nextRun, id); +} + +export function logTaskRun(log: TaskRunLog): void { + db.prepare(` + INSERT INTO task_run_logs (task_id, run_at, duration_ms, status, result, error) + VALUES (?, ?, ?, ?, ?, ?) + `).run(log.task_id, log.run_at, log.duration_ms, log.status, log.result, log.error); +} + +export function getTaskRunLogs(taskId: string, limit = 10): TaskRunLog[] { + return db.prepare(` + SELECT task_id, run_at, duration_ms, status, result, error + FROM task_run_logs + WHERE task_id = ? + ORDER BY run_at DESC + LIMIT ? + `).all(taskId, limit) as TaskRunLog[]; +} diff --git a/src/index.ts b/src/index.ts index 06d90f8..c91b267 100644 --- a/src/index.ts +++ b/src/index.ts @@ -17,10 +17,13 @@ import { GROUPS_DIR, DATA_DIR, TRIGGER_PATTERN, - CLEAR_COMMAND + CLEAR_COMMAND, + MAIN_GROUP_FOLDER } from './config.js'; import { RegisteredGroup, Session, NewMessage } from './types.js'; import { initDatabase, storeMessage, getNewMessages, getMessagesSince } from './db.js'; +import { createSchedulerMcp } from './scheduler-mcp.js'; +import { startSchedulerLoop } from './scheduler.js'; const logger = pino({ level: process.env.LOG_LEVEL || 'info', @@ -104,7 +107,7 @@ async function processMessage(msg: NewMessage): Promise { if (!prompt) return; logger.info({ group: group.name, messageCount: missedMessages.length }, 'Processing message'); - const response = await runAgent(group, prompt); + const response = await runAgent(group, prompt, msg.chat_jid); // Update last agent timestamp lastAgentTimestamp[msg.chat_jid] = msg.timestamp; @@ -114,26 +117,39 @@ async function processMessage(msg: NewMessage): Promise { } } -async function runAgent(group: RegisteredGroup, prompt: string): Promise { +async function runAgent(group: RegisteredGroup, prompt: string, chatJid: string): Promise { const groupDir = path.join(GROUPS_DIR, group.folder); fs.mkdirSync(groupDir, { recursive: true }); + const isMain = group.folder === MAIN_GROUP_FOLDER; const sessionId = sessions[group.folder]; let newSessionId: string | undefined; let result: string | null = null; + // Create scheduler MCP with current group context + const schedulerMcp = createSchedulerMcp({ + groupFolder: group.folder, + chatJid, + isMain, + sendMessage + }); + + // Main channel gets Bash access for admin tasks (querying DB, etc.) + const baseTools = ['Read', 'Write', 'Edit', 'Glob', 'Grep', 'WebSearch', 'WebFetch', 'mcp__nanoclaw__*', 'mcp__gmail__*']; + const allowedTools = isMain ? [...baseTools, 'Bash'] : baseTools; + try { for await (const message of query({ prompt, options: { cwd: groupDir, resume: sessionId, - allowedTools: ['Read', 'Write', 'Edit', 'Glob', 'Grep', 'WebSearch', 'WebFetch'], + allowedTools, permissionMode: 'bypassPermissions', settingSources: ['project'], mcpServers: { - gmail: { command: 'npx', args: ['-y', '@gongrzhe/server-gmail-autoauth-mcp'] }, - scheduler: { command: 'npx', args: ['-y', 'schedule-task-mcp'] } + nanoclaw: schedulerMcp, + gmail: { command: 'npx', args: ['-y', '@gongrzhe/server-gmail-autoauth-mcp'] } } } })) { @@ -203,6 +219,7 @@ async function connectWhatsApp(): Promise { } } else if (connection === 'open') { logger.info('Connected to WhatsApp'); + startSchedulerLoop({ sendMessage }); startMessageLoop(); } }); diff --git a/src/scheduler-mcp.ts b/src/scheduler-mcp.ts new file mode 100644 index 0000000..971ed8f --- /dev/null +++ b/src/scheduler-mcp.ts @@ -0,0 +1,284 @@ +import { createSdkMcpServer, tool } from '@anthropic-ai/claude-agent-sdk'; +import { z } from 'zod'; +import { CronExpressionParser } from 'cron-parser'; +import { + createTask, + getTaskById, + getTasksForGroup, + getAllTasks, + updateTask, + deleteTask, + getTaskRunLogs +} from './db.js'; +import { ScheduledTask } from './types.js'; +import { MAIN_GROUP_FOLDER } from './config.js'; + +function generateTaskId(): string { + return `task-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; +} + +function calculateNextRun(scheduleType: string, scheduleValue: string): string | null { + const now = new Date(); + + switch (scheduleType) { + case 'cron': { + const interval = CronExpressionParser.parse(scheduleValue); + return interval.next().toISOString(); + } + case 'interval': { + const ms = parseInt(scheduleValue, 10); + return new Date(now.getTime() + ms).toISOString(); + } + case 'once': { + const runAt = new Date(scheduleValue); + return runAt > now ? runAt.toISOString() : null; + } + default: + return null; + } +} + +function formatTask(task: ScheduledTask): string { + const lines = [ + `ID: ${task.id}`, + `Group: ${task.group_folder}`, + `Prompt: ${task.prompt}`, + `Schedule: ${task.schedule_type} (${task.schedule_value})`, + `Status: ${task.status}`, + `Next run: ${task.next_run || 'N/A'}`, + `Last run: ${task.last_run || 'Never'}`, + `Last result: ${task.last_result || 'N/A'}` + ]; + return lines.join('\n'); +} + +export interface SchedulerMcpContext { + groupFolder: string; + chatJid: string; + isMain: boolean; + sendMessage: (jid: string, text: string) => Promise; +} + +export function createSchedulerMcp(ctx: SchedulerMcpContext) { + const { groupFolder, chatJid, isMain, sendMessage } = ctx; + + return createSdkMcpServer({ + name: 'nanoclaw', + version: '1.0.0', + tools: [ + tool( + 'schedule_task', + 'Schedule a recurring or one-time task. The task will run as an agent in the current group context.', + { + prompt: z.string().describe('The prompt/instruction for the task when it runs'), + schedule_type: z.enum(['cron', 'interval', 'once']).describe('Type of schedule: cron (e.g., "0 9 * * 1" for Mondays at 9am), interval (milliseconds), or once (ISO timestamp)'), + schedule_value: z.string().describe('Schedule value: cron expression, milliseconds for interval, or ISO timestamp for once'), + target_group: z.string().optional().describe('(Main channel only) Target group folder to run the task in. Defaults to current group.') + }, + async (args) => { + const targetGroup = isMain && args.target_group ? args.target_group : groupFolder; + const targetJid = isMain && args.target_group ? '' : chatJid; // Will need to look up JID for other groups + + // Validate schedule + const nextRun = calculateNextRun(args.schedule_type, args.schedule_value); + if (nextRun === null && args.schedule_type !== 'once') { + return { content: [{ type: 'text', text: 'Error: Invalid schedule. Task would never run.' }] }; + } + + const task: Omit = { + id: generateTaskId(), + group_folder: targetGroup, + chat_jid: targetJid || chatJid, + prompt: args.prompt, + schedule_type: args.schedule_type, + schedule_value: args.schedule_value, + next_run: nextRun, + status: 'active', + created_at: new Date().toISOString() + }; + + createTask(task); + + return { + content: [{ + type: 'text', + text: `Task scheduled successfully!\n\n${formatTask(task as ScheduledTask)}` + }] + }; + } + ), + + tool( + 'list_tasks', + 'List scheduled tasks. Shows tasks for the current group, or all tasks if called from the main channel.', + {}, + async () => { + const tasks = isMain ? getAllTasks() : getTasksForGroup(groupFolder); + + if (tasks.length === 0) { + return { content: [{ type: 'text', text: 'No scheduled tasks found.' }] }; + } + + const formatted = tasks.map((t, i) => `--- Task ${i + 1} ---\n${formatTask(t)}`).join('\n\n'); + return { content: [{ type: 'text', text: `Found ${tasks.length} task(s):\n\n${formatted}` }] }; + } + ), + + tool( + 'get_task', + 'Get details about a specific task including run history.', + { + task_id: z.string().describe('The task ID') + }, + async (args) => { + const task = getTaskById(args.task_id); + if (!task) { + return { content: [{ type: 'text', text: `Task not found: ${args.task_id}` }] }; + } + + // Check permissions + if (!isMain && task.group_folder !== groupFolder) { + return { content: [{ type: 'text', text: 'Access denied: Task belongs to another group.' }] }; + } + + const logs = getTaskRunLogs(args.task_id, 5); + let output = formatTask(task); + + if (logs.length > 0) { + output += '\n\n--- Recent Runs ---\n'; + output += logs.map(l => + `${l.run_at}: ${l.status} (${l.duration_ms}ms)${l.error ? ` - ${l.error}` : ''}` + ).join('\n'); + } + + return { content: [{ type: 'text', text: output }] }; + } + ), + + tool( + 'update_task', + 'Update a scheduled task.', + { + task_id: z.string().describe('The task ID'), + prompt: z.string().optional().describe('New prompt for the task'), + schedule_type: z.enum(['cron', 'interval', 'once']).optional().describe('New schedule type'), + schedule_value: z.string().optional().describe('New schedule value') + }, + async (args) => { + const task = getTaskById(args.task_id); + if (!task) { + return { content: [{ type: 'text', text: `Task not found: ${args.task_id}` }] }; + } + + if (!isMain && task.group_folder !== groupFolder) { + return { content: [{ type: 'text', text: 'Access denied: Task belongs to another group.' }] }; + } + + const updates: Parameters[1] = {}; + if (args.prompt) updates.prompt = args.prompt; + if (args.schedule_type) updates.schedule_type = args.schedule_type; + if (args.schedule_value) updates.schedule_value = args.schedule_value; + + // Recalculate next_run if schedule changed + if (args.schedule_type || args.schedule_value) { + const schedType = args.schedule_type || task.schedule_type; + const schedValue = args.schedule_value || task.schedule_value; + updates.next_run = calculateNextRun(schedType, schedValue); + } + + updateTask(args.task_id, updates); + const updated = getTaskById(args.task_id)!; + + return { content: [{ type: 'text', text: `Task updated!\n\n${formatTask(updated)}` }] }; + } + ), + + tool( + 'pause_task', + 'Pause a scheduled task.', + { + task_id: z.string().describe('The task ID') + }, + async (args) => { + const task = getTaskById(args.task_id); + if (!task) { + return { content: [{ type: 'text', text: `Task not found: ${args.task_id}` }] }; + } + + if (!isMain && task.group_folder !== groupFolder) { + return { content: [{ type: 'text', text: 'Access denied: Task belongs to another group.' }] }; + } + + updateTask(args.task_id, { status: 'paused' }); + return { content: [{ type: 'text', text: `Task ${args.task_id} paused.` }] }; + } + ), + + tool( + 'resume_task', + 'Resume a paused task.', + { + task_id: z.string().describe('The task ID') + }, + async (args) => { + const task = getTaskById(args.task_id); + if (!task) { + return { content: [{ type: 'text', text: `Task not found: ${args.task_id}` }] }; + } + + if (!isMain && task.group_folder !== groupFolder) { + return { content: [{ type: 'text', text: 'Access denied: Task belongs to another group.' }] }; + } + + // Recalculate next_run when resuming + const nextRun = calculateNextRun(task.schedule_type, task.schedule_value); + updateTask(args.task_id, { status: 'active', next_run: nextRun }); + + return { content: [{ type: 'text', text: `Task ${args.task_id} resumed. Next run: ${nextRun}` }] }; + } + ), + + tool( + 'cancel_task', + 'Cancel and delete a scheduled task.', + { + task_id: z.string().describe('The task ID') + }, + async (args) => { + const task = getTaskById(args.task_id); + if (!task) { + return { content: [{ type: 'text', text: `Task not found: ${args.task_id}` }] }; + } + + if (!isMain && task.group_folder !== groupFolder) { + return { content: [{ type: 'text', text: 'Access denied: Task belongs to another group.' }] }; + } + + deleteTask(args.task_id); + return { content: [{ type: 'text', text: `Task ${args.task_id} cancelled and deleted.` }] }; + } + ), + + tool( + 'send_message', + 'Send a message to the WhatsApp group. Use this to notify the group about task results or updates.', + { + text: z.string().describe('The message text to send'), + target_jid: z.string().optional().describe('(Main channel only) Target group JID. Defaults to current group.') + }, + async (args) => { + const targetJid = isMain && args.target_jid ? args.target_jid : chatJid; + + try { + await sendMessage(targetJid, args.text); + return { content: [{ type: 'text', text: 'Message sent successfully.' }] }; + } catch (error) { + return { content: [{ type: 'text', text: `Failed to send message: ${error}` }] }; + } + } + ) + ] + }); +} + +export { calculateNextRun }; diff --git a/src/scheduler.ts b/src/scheduler.ts new file mode 100644 index 0000000..8b1fc0d --- /dev/null +++ b/src/scheduler.ts @@ -0,0 +1,118 @@ +import { query } from '@anthropic-ai/claude-agent-sdk'; +import fs from 'fs'; +import path from 'path'; +import pino from 'pino'; +import { CronExpressionParser } from 'cron-parser'; +import { getDueTasks, updateTaskAfterRun, logTaskRun, getTaskById } from './db.js'; +import { createSchedulerMcp } from './scheduler-mcp.js'; +import { ScheduledTask } from './types.js'; +import { GROUPS_DIR, SCHEDULER_POLL_INTERVAL } from './config.js'; + +const logger = pino({ + level: process.env.LOG_LEVEL || 'info', + transport: { target: 'pino-pretty', options: { colorize: true } } +}); + +export interface SchedulerDependencies { + sendMessage: (jid: string, text: string) => Promise; +} + +async function runTask(task: ScheduledTask, deps: SchedulerDependencies): Promise { + const startTime = Date.now(); + const groupDir = path.join(GROUPS_DIR, task.group_folder); + fs.mkdirSync(groupDir, { recursive: true }); + + logger.info({ taskId: task.id, group: task.group_folder }, 'Running scheduled task'); + + // Create the scheduler MCP with task's group context + const schedulerMcp = createSchedulerMcp({ + groupFolder: task.group_folder, + chatJid: task.chat_jid, + isMain: false, // Scheduled tasks run in their group's context, not as main + sendMessage: deps.sendMessage + }); + + let result: string | null = null; + let error: string | null = null; + + try { + for await (const message of query({ + prompt: task.prompt, + options: { + cwd: groupDir, + allowedTools: ['Read', 'Write', 'Edit', 'Glob', 'Grep', 'WebSearch', 'WebFetch', 'mcp__nanoclaw__*', 'mcp__gmail__*'], + permissionMode: 'bypassPermissions', + settingSources: ['project'], + mcpServers: { + nanoclaw: schedulerMcp, + gmail: { command: 'npx', args: ['-y', '@gongrzhe/server-gmail-autoauth-mcp'] } + } + } + })) { + if ('result' in message && message.result) { + result = message.result as string; + } + } + + logger.info({ taskId: task.id, durationMs: Date.now() - startTime }, 'Task completed successfully'); + } catch (err) { + error = err instanceof Error ? err.message : String(err); + logger.error({ taskId: task.id, error }, 'Task failed'); + } + + const durationMs = Date.now() - startTime; + + // Log the run + logTaskRun({ + task_id: task.id, + run_at: new Date().toISOString(), + duration_ms: durationMs, + status: error ? 'error' : 'success', + result, + error + }); + + // Calculate next run + let nextRun: string | null = null; + if (task.schedule_type === 'cron') { + const interval = CronExpressionParser.parse(task.schedule_value); + nextRun = interval.next().toISOString(); + } else if (task.schedule_type === 'interval') { + const ms = parseInt(task.schedule_value, 10); + nextRun = new Date(Date.now() + ms).toISOString(); + } + // 'once' tasks don't have a next run + + // Update task + const resultSummary = error ? `Error: ${error}` : (result ? result.slice(0, 200) : 'Completed'); + updateTaskAfterRun(task.id, nextRun, resultSummary); +} + +export function startSchedulerLoop(deps: SchedulerDependencies): void { + logger.info('Scheduler loop started'); + + const loop = async () => { + try { + const dueTasks = getDueTasks(); + if (dueTasks.length > 0) { + logger.info({ count: dueTasks.length }, 'Found due tasks'); + } + + for (const task of dueTasks) { + // Re-check task status in case it was paused/cancelled + const currentTask = getTaskById(task.id); + if (!currentTask || currentTask.status !== 'active') { + continue; + } + + await runTask(currentTask, deps); + } + } catch (err) { + logger.error({ err }, 'Error in scheduler loop'); + } + + setTimeout(loop, SCHEDULER_POLL_INTERVAL); + }; + + loop(); +} diff --git a/src/types.ts b/src/types.ts index dea2a56..44652aa 100644 --- a/src/types.ts +++ b/src/types.ts @@ -17,3 +17,26 @@ export interface NewMessage { content: string; timestamp: string; } + +export interface ScheduledTask { + id: string; + group_folder: string; + chat_jid: string; + prompt: string; + schedule_type: 'cron' | 'interval' | 'once'; + schedule_value: string; + next_run: string | null; + last_run: string | null; + last_result: string | null; + status: 'active' | 'paused' | 'completed'; + created_at: string; +} + +export interface TaskRunLog { + task_id: string; + run_at: string; + duration_ms: number; + status: 'success' | 'error'; + result: string | null; + error: string | null; +}