* feat: add Telegram channel with agent swarm support Add Telegram as a messaging channel that can run alongside WhatsApp or standalone (TELEGRAM_ONLY mode). Includes bot pool support for agent swarms where each subagent appears as a different bot identity in the group. - Add grammy dependency for Telegram Bot API - Route messages through tg: JID prefix convention - Add storeMessageDirect for non-Baileys channels - Add sender field to IPC send_message for swarm identity - Support TELEGRAM_BOT_TOKEN, TELEGRAM_ONLY, TELEGRAM_BOT_POOL config Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * docs: add index.ts refactor plan Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * refactor: extract channel abstraction, IPC, and router from index.ts Break the 1088-line monolith into focused modules: - src/channels/whatsapp.ts: WhatsAppChannel class implementing Channel interface - src/ipc.ts: IPC watcher and task processing with dependency injection - src/router.ts: message formatting, outbound routing, channel lookup - src/types.ts: Channel interface, OnInboundMessage, OnChatMetadata types Also adds regression test suite (98 tests), updates all documentation and skill files to reflect the new architecture. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * ci: add test workflow for PRs Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * chore: remove accidentally committed pool-bot assets Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix(ci): remove grammy from base dependencies Grammy is installed by the /add-telegram skill, not a base dependency. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
219 lines
6.2 KiB
TypeScript
219 lines
6.2 KiB
TypeScript
import { ChildProcess } from 'child_process';
|
|
import { CronExpressionParser } from 'cron-parser';
|
|
import fs from 'fs';
|
|
import path from 'path';
|
|
|
|
import {
|
|
GROUPS_DIR,
|
|
IDLE_TIMEOUT,
|
|
MAIN_GROUP_FOLDER,
|
|
SCHEDULER_POLL_INTERVAL,
|
|
TIMEZONE,
|
|
} from './config.js';
|
|
import { ContainerOutput, runContainerAgent, writeTasksSnapshot } from './container-runner.js';
|
|
import {
|
|
getAllTasks,
|
|
getDueTasks,
|
|
getTaskById,
|
|
logTaskRun,
|
|
updateTaskAfterRun,
|
|
} from './db.js';
|
|
import { GroupQueue } from './group-queue.js';
|
|
import { logger } from './logger.js';
|
|
import { RegisteredGroup, ScheduledTask } from './types.js';
|
|
|
|
export interface SchedulerDependencies {
|
|
registeredGroups: () => Record<string, RegisteredGroup>;
|
|
getSessions: () => Record<string, string>;
|
|
queue: GroupQueue;
|
|
onProcess: (groupJid: string, proc: ChildProcess, containerName: string, groupFolder: string) => void;
|
|
sendMessage: (jid: string, text: string) => Promise<void>;
|
|
}
|
|
|
|
async function runTask(
|
|
task: ScheduledTask,
|
|
deps: SchedulerDependencies,
|
|
): Promise<void> {
|
|
const startTime = Date.now();
|
|
const groupDir = path.join(GROUPS_DIR, task.group_folder);
|
|
fs.mkdirSync(groupDir, { recursive: true });
|
|
|
|
logger.info(
|
|
{ taskId: task.id, group: task.group_folder },
|
|
'Running scheduled task',
|
|
);
|
|
|
|
const groups = deps.registeredGroups();
|
|
const group = Object.values(groups).find(
|
|
(g) => g.folder === task.group_folder,
|
|
);
|
|
|
|
if (!group) {
|
|
logger.error(
|
|
{ taskId: task.id, groupFolder: task.group_folder },
|
|
'Group not found for task',
|
|
);
|
|
logTaskRun({
|
|
task_id: task.id,
|
|
run_at: new Date().toISOString(),
|
|
duration_ms: Date.now() - startTime,
|
|
status: 'error',
|
|
result: null,
|
|
error: `Group not found: ${task.group_folder}`,
|
|
});
|
|
return;
|
|
}
|
|
|
|
// Update tasks snapshot for container to read (filtered by group)
|
|
const isMain = task.group_folder === MAIN_GROUP_FOLDER;
|
|
const tasks = getAllTasks();
|
|
writeTasksSnapshot(
|
|
task.group_folder,
|
|
isMain,
|
|
tasks.map((t) => ({
|
|
id: t.id,
|
|
groupFolder: t.group_folder,
|
|
prompt: t.prompt,
|
|
schedule_type: t.schedule_type,
|
|
schedule_value: t.schedule_value,
|
|
status: t.status,
|
|
next_run: t.next_run,
|
|
})),
|
|
);
|
|
|
|
let result: string | null = null;
|
|
let error: string | null = null;
|
|
|
|
// For group context mode, use the group's current session
|
|
const sessions = deps.getSessions();
|
|
const sessionId =
|
|
task.context_mode === 'group' ? sessions[task.group_folder] : undefined;
|
|
|
|
// Idle timer: writes _close sentinel after IDLE_TIMEOUT of no output,
|
|
// so the container exits instead of hanging at waitForIpcMessage forever.
|
|
let idleTimer: ReturnType<typeof setTimeout> | null = null;
|
|
|
|
const resetIdleTimer = () => {
|
|
if (idleTimer) clearTimeout(idleTimer);
|
|
idleTimer = setTimeout(() => {
|
|
logger.debug({ taskId: task.id }, 'Scheduled task idle timeout, closing container stdin');
|
|
deps.queue.closeStdin(task.chat_jid);
|
|
}, IDLE_TIMEOUT);
|
|
};
|
|
|
|
try {
|
|
const output = await runContainerAgent(
|
|
group,
|
|
{
|
|
prompt: task.prompt,
|
|
sessionId,
|
|
groupFolder: task.group_folder,
|
|
chatJid: task.chat_jid,
|
|
isMain,
|
|
isScheduledTask: true,
|
|
},
|
|
(proc, containerName) => deps.onProcess(task.chat_jid, proc, containerName, task.group_folder),
|
|
async (streamedOutput: ContainerOutput) => {
|
|
if (streamedOutput.result) {
|
|
result = streamedOutput.result;
|
|
// Forward result to user (sendMessage handles formatting)
|
|
await deps.sendMessage(task.chat_jid, streamedOutput.result);
|
|
// Only reset idle timer on actual results, not session-update markers
|
|
resetIdleTimer();
|
|
}
|
|
if (streamedOutput.status === 'error') {
|
|
error = streamedOutput.error || 'Unknown error';
|
|
}
|
|
},
|
|
);
|
|
|
|
if (idleTimer) clearTimeout(idleTimer);
|
|
|
|
if (output.status === 'error') {
|
|
error = output.error || 'Unknown error';
|
|
} else if (output.result) {
|
|
// Messages are sent via MCP tool (IPC), result text is just logged
|
|
result = output.result;
|
|
}
|
|
|
|
logger.info(
|
|
{ taskId: task.id, durationMs: Date.now() - startTime },
|
|
'Task completed',
|
|
);
|
|
} catch (err) {
|
|
if (idleTimer) clearTimeout(idleTimer);
|
|
error = err instanceof Error ? err.message : String(err);
|
|
logger.error({ taskId: task.id, error }, 'Task failed');
|
|
}
|
|
|
|
const durationMs = Date.now() - startTime;
|
|
|
|
logTaskRun({
|
|
task_id: task.id,
|
|
run_at: new Date().toISOString(),
|
|
duration_ms: durationMs,
|
|
status: error ? 'error' : 'success',
|
|
result,
|
|
error,
|
|
});
|
|
|
|
let nextRun: string | null = null;
|
|
if (task.schedule_type === 'cron') {
|
|
const interval = CronExpressionParser.parse(task.schedule_value, {
|
|
tz: TIMEZONE,
|
|
});
|
|
nextRun = interval.next().toISOString();
|
|
} else if (task.schedule_type === 'interval') {
|
|
const ms = parseInt(task.schedule_value, 10);
|
|
nextRun = new Date(Date.now() + ms).toISOString();
|
|
}
|
|
// 'once' tasks have no next run
|
|
|
|
const resultSummary = error
|
|
? `Error: ${error}`
|
|
: result
|
|
? result.slice(0, 200)
|
|
: 'Completed';
|
|
updateTaskAfterRun(task.id, nextRun, resultSummary);
|
|
}
|
|
|
|
let schedulerRunning = false;
|
|
|
|
export function startSchedulerLoop(deps: SchedulerDependencies): void {
|
|
if (schedulerRunning) {
|
|
logger.debug('Scheduler loop already running, skipping duplicate start');
|
|
return;
|
|
}
|
|
schedulerRunning = true;
|
|
logger.info('Scheduler loop started');
|
|
|
|
const loop = async () => {
|
|
try {
|
|
const dueTasks = getDueTasks();
|
|
if (dueTasks.length > 0) {
|
|
logger.info({ count: dueTasks.length }, 'Found due tasks');
|
|
}
|
|
|
|
for (const task of dueTasks) {
|
|
// Re-check task status in case it was paused/cancelled
|
|
const currentTask = getTaskById(task.id);
|
|
if (!currentTask || currentTask.status !== 'active') {
|
|
continue;
|
|
}
|
|
|
|
deps.queue.enqueueTask(
|
|
currentTask.chat_jid,
|
|
currentTask.id,
|
|
() => runTask(currentTask, deps),
|
|
);
|
|
}
|
|
} catch (err) {
|
|
logger.error({ err }, 'Error in scheduler loop');
|
|
}
|
|
|
|
setTimeout(loop, SCHEDULER_POLL_INTERVAL);
|
|
};
|
|
|
|
loop();
|
|
}
|