Refactor index (#156)

* feat: add Telegram channel with agent swarm support

Add Telegram as a messaging channel that can run alongside WhatsApp
or standalone (TELEGRAM_ONLY mode). Includes bot pool support for
agent swarms where each subagent appears as a different bot identity
in the group.

- Add grammy dependency for Telegram Bot API
- Route messages through tg: JID prefix convention
- Add storeMessageDirect for non-Baileys channels
- Add sender field to IPC send_message for swarm identity
- Support TELEGRAM_BOT_TOKEN, TELEGRAM_ONLY, TELEGRAM_BOT_POOL config

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* docs: add index.ts refactor plan

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* refactor: extract channel abstraction, IPC, and router from index.ts

Break the 1088-line monolith into focused modules:
- src/channels/whatsapp.ts: WhatsAppChannel class implementing Channel interface
- src/ipc.ts: IPC watcher and task processing with dependency injection
- src/router.ts: message formatting, outbound routing, channel lookup
- src/types.ts: Channel interface, OnInboundMessage, OnChatMetadata types

Also adds regression test suite (98 tests), updates all documentation
and skill files to reflect the new architecture.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* ci: add test workflow for PRs

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* chore: remove accidentally committed pool-bot assets

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix(ci): remove grammy from base dependencies

Grammy is installed by the /add-telegram skill, not a base dependency.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
gavrielc
2026-02-11 00:36:37 +02:00
committed by GitHub
parent 196abf67cf
commit 2b56fecfdc
28 changed files with 4273 additions and 1066 deletions

283
src/channels/whatsapp.ts Normal file
View File

@@ -0,0 +1,283 @@
import { exec } from 'child_process';
import fs from 'fs';
import path from 'path';
import makeWASocket, {
DisconnectReason,
WASocket,
makeCacheableSignalKeyStore,
useMultiFileAuthState,
} from '@whiskeysockets/baileys';
import { STORE_DIR } from '../config.js';
import {
getLastGroupSync,
setLastGroupSync,
updateChatName,
} from '../db.js';
import { logger } from '../logger.js';
import { Channel, OnInboundMessage, OnChatMetadata, RegisteredGroup } from '../types.js';
const GROUP_SYNC_INTERVAL_MS = 24 * 60 * 60 * 1000; // 24 hours
export interface WhatsAppChannelOpts {
onMessage: OnInboundMessage;
onChatMetadata: OnChatMetadata;
registeredGroups: () => Record<string, RegisteredGroup>;
}
export class WhatsAppChannel implements Channel {
name = 'whatsapp';
prefixAssistantName = true;
private sock!: WASocket;
private connected = false;
private lidToPhoneMap: Record<string, string> = {};
private outgoingQueue: Array<{ jid: string; text: string }> = [];
private flushing = false;
private groupSyncTimerStarted = false;
private opts: WhatsAppChannelOpts;
constructor(opts: WhatsAppChannelOpts) {
this.opts = opts;
}
async connect(): Promise<void> {
return new Promise<void>((resolve, reject) => {
this.connectInternal(resolve).catch(reject);
});
}
private async connectInternal(onFirstOpen?: () => void): Promise<void> {
const authDir = path.join(STORE_DIR, 'auth');
fs.mkdirSync(authDir, { recursive: true });
const { state, saveCreds } = await useMultiFileAuthState(authDir);
this.sock = makeWASocket({
auth: {
creds: state.creds,
keys: makeCacheableSignalKeyStore(state.keys, logger),
},
printQRInTerminal: false,
logger,
browser: ['NanoClaw', 'Chrome', '1.0.0'],
});
this.sock.ev.on('connection.update', (update) => {
const { connection, lastDisconnect, qr } = update;
if (qr) {
const msg =
'WhatsApp authentication required. Run /setup in Claude Code.';
logger.error(msg);
exec(
`osascript -e 'display notification "${msg}" with title "NanoClaw" sound name "Basso"'`,
);
setTimeout(() => process.exit(1), 1000);
}
if (connection === 'close') {
this.connected = false;
const reason = (lastDisconnect?.error as any)?.output?.statusCode;
const shouldReconnect = reason !== DisconnectReason.loggedOut;
logger.info({ reason, shouldReconnect, queuedMessages: this.outgoingQueue.length }, 'Connection closed');
if (shouldReconnect) {
logger.info('Reconnecting...');
this.connectInternal().catch((err) => {
logger.error({ err }, 'Failed to reconnect, retrying in 5s');
setTimeout(() => {
this.connectInternal().catch((err2) => {
logger.error({ err: err2 }, 'Reconnection retry failed');
});
}, 5000);
});
} else {
logger.info('Logged out. Run /setup to re-authenticate.');
process.exit(0);
}
} else if (connection === 'open') {
this.connected = true;
logger.info('Connected to WhatsApp');
// Build LID to phone mapping from auth state for self-chat translation
if (this.sock.user) {
const phoneUser = this.sock.user.id.split(':')[0];
const lidUser = this.sock.user.lid?.split(':')[0];
if (lidUser && phoneUser) {
this.lidToPhoneMap[lidUser] = `${phoneUser}@s.whatsapp.net`;
logger.debug({ lidUser, phoneUser }, 'LID to phone mapping set');
}
}
// Flush any messages queued while disconnected
this.flushOutgoingQueue().catch((err) =>
logger.error({ err }, 'Failed to flush outgoing queue'),
);
// Sync group metadata on startup (respects 24h cache)
this.syncGroupMetadata().catch((err) =>
logger.error({ err }, 'Initial group sync failed'),
);
// Set up daily sync timer (only once)
if (!this.groupSyncTimerStarted) {
this.groupSyncTimerStarted = true;
setInterval(() => {
this.syncGroupMetadata().catch((err) =>
logger.error({ err }, 'Periodic group sync failed'),
);
}, GROUP_SYNC_INTERVAL_MS);
}
// Signal first connection to caller
if (onFirstOpen) {
onFirstOpen();
onFirstOpen = undefined;
}
}
});
this.sock.ev.on('creds.update', saveCreds);
this.sock.ev.on('messages.upsert', ({ messages }) => {
for (const msg of messages) {
if (!msg.message) continue;
const rawJid = msg.key.remoteJid;
if (!rawJid || rawJid === 'status@broadcast') continue;
// Translate LID JID to phone JID if applicable
const chatJid = this.translateJid(rawJid);
const timestamp = new Date(
Number(msg.messageTimestamp) * 1000,
).toISOString();
// Always notify about chat metadata for group discovery
this.opts.onChatMetadata(chatJid, timestamp);
// Only deliver full message for registered groups
const groups = this.opts.registeredGroups();
if (groups[chatJid]) {
const content =
msg.message?.conversation ||
msg.message?.extendedTextMessage?.text ||
msg.message?.imageMessage?.caption ||
msg.message?.videoMessage?.caption ||
'';
const sender = msg.key.participant || msg.key.remoteJid || '';
const senderName = msg.pushName || sender.split('@')[0];
this.opts.onMessage(chatJid, {
id: msg.key.id || '',
chat_jid: chatJid,
sender,
sender_name: senderName,
content,
timestamp,
is_from_me: msg.key.fromMe || false,
});
}
}
});
}
async sendMessage(jid: string, text: string): Promise<void> {
if (!this.connected) {
this.outgoingQueue.push({ jid, text });
logger.info({ jid, length: text.length, queueSize: this.outgoingQueue.length }, 'WA disconnected, message queued');
return;
}
try {
await this.sock.sendMessage(jid, { text });
logger.info({ jid, length: text.length }, 'Message sent');
} catch (err) {
// If send fails, queue it for retry on reconnect
this.outgoingQueue.push({ jid, text });
logger.warn({ jid, err, queueSize: this.outgoingQueue.length }, 'Failed to send, message queued');
}
}
isConnected(): boolean {
return this.connected;
}
ownsJid(jid: string): boolean {
return jid.endsWith('@g.us') || jid.endsWith('@s.whatsapp.net');
}
async disconnect(): Promise<void> {
this.connected = false;
this.sock?.end(undefined);
}
async setTyping(jid: string, isTyping: boolean): Promise<void> {
try {
await this.sock.sendPresenceUpdate(isTyping ? 'composing' : 'paused', jid);
} catch (err) {
logger.debug({ jid, err }, 'Failed to update typing status');
}
}
/**
* Sync group metadata from WhatsApp.
* Fetches all participating groups and stores their names in the database.
* Called on startup, daily, and on-demand via IPC.
*/
async syncGroupMetadata(force = false): Promise<void> {
if (!force) {
const lastSync = getLastGroupSync();
if (lastSync) {
const lastSyncTime = new Date(lastSync).getTime();
if (Date.now() - lastSyncTime < GROUP_SYNC_INTERVAL_MS) {
logger.debug({ lastSync }, 'Skipping group sync - synced recently');
return;
}
}
}
try {
logger.info('Syncing group metadata from WhatsApp...');
const groups = await this.sock.groupFetchAllParticipating();
let count = 0;
for (const [jid, metadata] of Object.entries(groups)) {
if (metadata.subject) {
updateChatName(jid, metadata.subject);
count++;
}
}
setLastGroupSync();
logger.info({ count }, 'Group metadata synced');
} catch (err) {
logger.error({ err }, 'Failed to sync group metadata');
}
}
private translateJid(jid: string): string {
if (!jid.endsWith('@lid')) return jid;
const lidUser = jid.split('@')[0].split(':')[0];
const phoneJid = this.lidToPhoneMap[lidUser];
if (phoneJid) {
logger.debug({ lidJid: jid, phoneJid }, 'Translated LID to phone JID');
return phoneJid;
}
return jid;
}
private async flushOutgoingQueue(): Promise<void> {
if (this.flushing || this.outgoingQueue.length === 0) return;
this.flushing = true;
try {
logger.info({ count: this.outgoingQueue.length }, 'Flushing outgoing message queue');
while (this.outgoingQueue.length > 0) {
const item = this.outgoingQueue.shift()!;
await this.sendMessage(item.jid, item.text);
}
} finally {
this.flushing = false;
}
}
}

View File

@@ -1,6 +1,12 @@
import path from 'path';
export const ASSISTANT_NAME = process.env.ASSISTANT_NAME || 'Andy';
export const TELEGRAM_BOT_TOKEN = process.env.TELEGRAM_BOT_TOKEN || '';
export const TELEGRAM_ONLY = process.env.TELEGRAM_ONLY === 'true';
export const TELEGRAM_BOT_POOL = (process.env.TELEGRAM_BOT_POOL || '')
.split(',')
.map((t) => t.trim())
.filter(Boolean);
export const POLL_INTERVAL = 2000;
export const SCHEDULER_POLL_INTERVAL = 60000;

315
src/db.test.ts Normal file
View File

@@ -0,0 +1,315 @@
import { describe, it, expect, beforeEach } from 'vitest';
import {
_initTestDatabase,
createTask,
deleteTask,
getAllChats,
getMessagesSince,
getNewMessages,
getTaskById,
storeChatMetadata,
storeMessage,
updateTask,
} from './db.js';
beforeEach(() => {
_initTestDatabase();
});
// Helper to store a message using the normalized NewMessage interface
function store(overrides: {
id: string;
chat_jid: string;
sender: string;
sender_name: string;
content: string;
timestamp: string;
is_from_me?: boolean;
}) {
storeMessage({
id: overrides.id,
chat_jid: overrides.chat_jid,
sender: overrides.sender,
sender_name: overrides.sender_name,
content: overrides.content,
timestamp: overrides.timestamp,
is_from_me: overrides.is_from_me ?? false,
});
}
// --- storeMessage (NewMessage format) ---
describe('storeMessage', () => {
it('stores a message and retrieves it', () => {
storeChatMetadata('group@g.us', '2024-01-01T00:00:00.000Z');
store({
id: 'msg-1',
chat_jid: 'group@g.us',
sender: '123@s.whatsapp.net',
sender_name: 'Alice',
content: 'hello world',
timestamp: '2024-01-01T00:00:01.000Z',
});
const messages = getMessagesSince('group@g.us', '2024-01-01T00:00:00.000Z', 'BotName');
expect(messages).toHaveLength(1);
expect(messages[0].id).toBe('msg-1');
expect(messages[0].sender).toBe('123@s.whatsapp.net');
expect(messages[0].sender_name).toBe('Alice');
expect(messages[0].content).toBe('hello world');
});
it('stores empty content', () => {
storeChatMetadata('group@g.us', '2024-01-01T00:00:00.000Z');
store({
id: 'msg-2',
chat_jid: 'group@g.us',
sender: '111@s.whatsapp.net',
sender_name: 'Dave',
content: '',
timestamp: '2024-01-01T00:00:04.000Z',
});
const messages = getMessagesSince('group@g.us', '2024-01-01T00:00:00.000Z', 'BotName');
expect(messages).toHaveLength(1);
expect(messages[0].content).toBe('');
});
it('stores is_from_me flag', () => {
storeChatMetadata('group@g.us', '2024-01-01T00:00:00.000Z');
store({
id: 'msg-3',
chat_jid: 'group@g.us',
sender: 'me@s.whatsapp.net',
sender_name: 'Me',
content: 'my message',
timestamp: '2024-01-01T00:00:05.000Z',
is_from_me: true,
});
// Message is stored (we can retrieve it — is_from_me doesn't affect retrieval)
const messages = getMessagesSince('group@g.us', '2024-01-01T00:00:00.000Z', 'BotName');
expect(messages).toHaveLength(1);
});
it('upserts on duplicate id+chat_jid', () => {
storeChatMetadata('group@g.us', '2024-01-01T00:00:00.000Z');
store({
id: 'msg-dup',
chat_jid: 'group@g.us',
sender: '123@s.whatsapp.net',
sender_name: 'Alice',
content: 'original',
timestamp: '2024-01-01T00:00:01.000Z',
});
store({
id: 'msg-dup',
chat_jid: 'group@g.us',
sender: '123@s.whatsapp.net',
sender_name: 'Alice',
content: 'updated',
timestamp: '2024-01-01T00:00:01.000Z',
});
const messages = getMessagesSince('group@g.us', '2024-01-01T00:00:00.000Z', 'BotName');
expect(messages).toHaveLength(1);
expect(messages[0].content).toBe('updated');
});
});
// --- getMessagesSince ---
describe('getMessagesSince', () => {
beforeEach(() => {
storeChatMetadata('group@g.us', '2024-01-01T00:00:00.000Z');
const msgs = [
{ id: 'm1', content: 'first', ts: '2024-01-01T00:00:01.000Z', sender: 'Alice' },
{ id: 'm2', content: 'second', ts: '2024-01-01T00:00:02.000Z', sender: 'Bob' },
{ id: 'm3', content: 'Andy: bot reply', ts: '2024-01-01T00:00:03.000Z', sender: 'Bot' },
{ id: 'm4', content: 'third', ts: '2024-01-01T00:00:04.000Z', sender: 'Carol' },
];
for (const m of msgs) {
store({
id: m.id,
chat_jid: 'group@g.us',
sender: `${m.sender}@s.whatsapp.net`,
sender_name: m.sender,
content: m.content,
timestamp: m.ts,
});
}
});
it('returns messages after the given timestamp', () => {
const msgs = getMessagesSince('group@g.us', '2024-01-01T00:00:02.000Z', 'Andy');
// Should exclude m1, m2 (before/at timestamp), m3 (bot message)
expect(msgs).toHaveLength(1);
expect(msgs[0].content).toBe('third');
});
it('excludes messages from the assistant (content prefix)', () => {
const msgs = getMessagesSince('group@g.us', '2024-01-01T00:00:00.000Z', 'Andy');
const botMsgs = msgs.filter((m) => m.content.startsWith('Andy:'));
expect(botMsgs).toHaveLength(0);
});
it('returns all messages when sinceTimestamp is empty', () => {
const msgs = getMessagesSince('group@g.us', '', 'Andy');
// 3 user messages (bot message excluded)
expect(msgs).toHaveLength(3);
});
});
// --- getNewMessages ---
describe('getNewMessages', () => {
beforeEach(() => {
storeChatMetadata('group1@g.us', '2024-01-01T00:00:00.000Z');
storeChatMetadata('group2@g.us', '2024-01-01T00:00:00.000Z');
const msgs = [
{ id: 'a1', chat: 'group1@g.us', content: 'g1 msg1', ts: '2024-01-01T00:00:01.000Z' },
{ id: 'a2', chat: 'group2@g.us', content: 'g2 msg1', ts: '2024-01-01T00:00:02.000Z' },
{ id: 'a3', chat: 'group1@g.us', content: 'Andy: reply', ts: '2024-01-01T00:00:03.000Z' },
{ id: 'a4', chat: 'group1@g.us', content: 'g1 msg2', ts: '2024-01-01T00:00:04.000Z' },
];
for (const m of msgs) {
store({
id: m.id,
chat_jid: m.chat,
sender: 'user@s.whatsapp.net',
sender_name: 'User',
content: m.content,
timestamp: m.ts,
});
}
});
it('returns new messages across multiple groups', () => {
const { messages, newTimestamp } = getNewMessages(
['group1@g.us', 'group2@g.us'],
'2024-01-01T00:00:00.000Z',
'Andy',
);
// Excludes 'Andy: reply', returns 3 messages
expect(messages).toHaveLength(3);
expect(newTimestamp).toBe('2024-01-01T00:00:04.000Z');
});
it('filters by timestamp', () => {
const { messages } = getNewMessages(
['group1@g.us', 'group2@g.us'],
'2024-01-01T00:00:02.000Z',
'Andy',
);
// Only g1 msg2 (after ts, not bot)
expect(messages).toHaveLength(1);
expect(messages[0].content).toBe('g1 msg2');
});
it('returns empty for no registered groups', () => {
const { messages, newTimestamp } = getNewMessages([], '', 'Andy');
expect(messages).toHaveLength(0);
expect(newTimestamp).toBe('');
});
});
// --- storeChatMetadata ---
describe('storeChatMetadata', () => {
it('stores chat with JID as default name', () => {
storeChatMetadata('group@g.us', '2024-01-01T00:00:00.000Z');
const chats = getAllChats();
expect(chats).toHaveLength(1);
expect(chats[0].jid).toBe('group@g.us');
expect(chats[0].name).toBe('group@g.us');
});
it('stores chat with explicit name', () => {
storeChatMetadata('group@g.us', '2024-01-01T00:00:00.000Z', 'My Group');
const chats = getAllChats();
expect(chats[0].name).toBe('My Group');
});
it('updates name on subsequent call with name', () => {
storeChatMetadata('group@g.us', '2024-01-01T00:00:00.000Z');
storeChatMetadata('group@g.us', '2024-01-01T00:00:01.000Z', 'Updated Name');
const chats = getAllChats();
expect(chats).toHaveLength(1);
expect(chats[0].name).toBe('Updated Name');
});
it('preserves newer timestamp on conflict', () => {
storeChatMetadata('group@g.us', '2024-01-01T00:00:05.000Z');
storeChatMetadata('group@g.us', '2024-01-01T00:00:01.000Z');
const chats = getAllChats();
expect(chats[0].last_message_time).toBe('2024-01-01T00:00:05.000Z');
});
});
// --- Task CRUD ---
describe('task CRUD', () => {
it('creates and retrieves a task', () => {
createTask({
id: 'task-1',
group_folder: 'main',
chat_jid: 'group@g.us',
prompt: 'do something',
schedule_type: 'once',
schedule_value: '2024-06-01T00:00:00.000Z',
context_mode: 'isolated',
next_run: '2024-06-01T00:00:00.000Z',
status: 'active',
created_at: '2024-01-01T00:00:00.000Z',
});
const task = getTaskById('task-1');
expect(task).toBeDefined();
expect(task!.prompt).toBe('do something');
expect(task!.status).toBe('active');
});
it('updates task status', () => {
createTask({
id: 'task-2',
group_folder: 'main',
chat_jid: 'group@g.us',
prompt: 'test',
schedule_type: 'once',
schedule_value: '2024-06-01T00:00:00.000Z',
context_mode: 'isolated',
next_run: null,
status: 'active',
created_at: '2024-01-01T00:00:00.000Z',
});
updateTask('task-2', { status: 'paused' });
expect(getTaskById('task-2')!.status).toBe('paused');
});
it('deletes a task and its run logs', () => {
createTask({
id: 'task-3',
group_folder: 'main',
chat_jid: 'group@g.us',
prompt: 'delete me',
schedule_type: 'once',
schedule_value: '2024-06-01T00:00:00.000Z',
context_mode: 'isolated',
next_run: null,
status: 'active',
created_at: '2024-01-01T00:00:00.000Z',
});
deleteTask('task-3');
expect(getTaskById('task-3')).toBeUndefined();
});
});

121
src/db.ts
View File

@@ -2,19 +2,13 @@ import Database from 'better-sqlite3';
import fs from 'fs';
import path from 'path';
import { proto } from '@whiskeysockets/baileys';
import { DATA_DIR, STORE_DIR } from './config.js';
import { NewMessage, RegisteredGroup, ScheduledTask, TaskRunLog } from './types.js';
let db: Database.Database;
export function initDatabase(): void {
const dbPath = path.join(STORE_DIR, 'messages.db');
fs.mkdirSync(path.dirname(dbPath), { recursive: true });
db = new Database(dbPath);
db.exec(`
function createSchema(database: Database.Database): void {
database.exec(`
CREATE TABLE IF NOT EXISTS chats (
jid TEXT PRIMARY KEY,
name TEXT,
@@ -60,35 +54,7 @@ export function initDatabase(): void {
FOREIGN KEY (task_id) REFERENCES scheduled_tasks(id)
);
CREATE INDEX IF NOT EXISTS idx_task_run_logs ON task_run_logs(task_id, run_at);
`);
// Add sender_name column if it doesn't exist (migration for existing DBs)
try {
db.exec(`ALTER TABLE messages ADD COLUMN sender_name TEXT`);
} catch {
/* column already exists */
}
// Add context_mode column if it doesn't exist (migration for existing DBs)
try {
db.exec(
`ALTER TABLE scheduled_tasks ADD COLUMN context_mode TEXT DEFAULT 'isolated'`,
);
} catch {
/* column already exists */
}
// Add requires_trigger column if it doesn't exist (migration for existing DBs)
try {
db.exec(
`ALTER TABLE registered_groups ADD COLUMN requires_trigger INTEGER DEFAULT 1`,
);
} catch {
/* column already exists */
}
// State tables (replacing JSON files)
db.exec(`
CREATE TABLE IF NOT EXISTS router_state (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
@@ -108,10 +74,33 @@ export function initDatabase(): void {
);
`);
// Add context_mode column if it doesn't exist (migration for existing DBs)
try {
database.exec(
`ALTER TABLE scheduled_tasks ADD COLUMN context_mode TEXT DEFAULT 'isolated'`,
);
} catch {
/* column already exists */
}
}
export function initDatabase(): void {
const dbPath = path.join(STORE_DIR, 'messages.db');
fs.mkdirSync(path.dirname(dbPath), { recursive: true });
db = new Database(dbPath);
createSchema(db);
// Migrate from JSON files if they exist
migrateJsonState();
}
/** @internal - for tests only. Creates a fresh in-memory database. */
export function _initTestDatabase(): void {
db = new Database(':memory:');
createSchema(db);
}
/**
* Store chat metadata only (no message content).
* Used for all chats to enable group discovery without storing sensitive content.
@@ -203,36 +192,42 @@ export function setLastGroupSync(): void {
* Store a message with full content.
* Only call this for registered groups where message history is needed.
*/
export function storeMessage(
msg: proto.IWebMessageInfo,
chatJid: string,
isFromMe: boolean,
pushName?: string,
): void {
if (!msg.key) return;
const content =
msg.message?.conversation ||
msg.message?.extendedTextMessage?.text ||
msg.message?.imageMessage?.caption ||
msg.message?.videoMessage?.caption ||
'';
const timestamp = new Date(Number(msg.messageTimestamp) * 1000).toISOString();
const sender = msg.key.participant || msg.key.remoteJid || '';
const senderName = pushName || sender.split('@')[0];
const msgId = msg.key.id || '';
export function storeMessage(msg: NewMessage): void {
db.prepare(
`INSERT OR REPLACE INTO messages (id, chat_jid, sender, sender_name, content, timestamp, is_from_me) VALUES (?, ?, ?, ?, ?, ?, ?)`,
).run(
msgId,
chatJid,
sender,
senderName,
content,
timestamp,
isFromMe ? 1 : 0,
msg.id,
msg.chat_jid,
msg.sender,
msg.sender_name,
msg.content,
msg.timestamp,
msg.is_from_me ? 1 : 0,
);
}
/**
* Store a message directly (for non-WhatsApp channels that don't use Baileys proto).
*/
export function storeMessageDirect(msg: {
id: string;
chat_jid: string;
sender: string;
sender_name: string;
content: string;
timestamp: string;
is_from_me: boolean;
}): void {
db.prepare(
`INSERT OR REPLACE INTO messages (id, chat_jid, sender, sender_name, content, timestamp, is_from_me) VALUES (?, ?, ?, ?, ?, ?, ?)`,
).run(
msg.id,
msg.chat_jid,
msg.sender,
msg.sender_name,
msg.content,
msg.timestamp,
msg.is_from_me ? 1 : 0,
);
}

246
src/formatting.test.ts Normal file
View File

@@ -0,0 +1,246 @@
import { describe, it, expect } from 'vitest';
import { ASSISTANT_NAME, TRIGGER_PATTERN } from './config.js';
import {
escapeXml,
formatMessages,
formatOutbound,
stripInternalTags,
} from './router.js';
import { Channel, NewMessage } from './types.js';
function makeMsg(overrides: Partial<NewMessage> = {}): NewMessage {
return {
id: '1',
chat_jid: 'group@g.us',
sender: '123@s.whatsapp.net',
sender_name: 'Alice',
content: 'hello',
timestamp: '2024-01-01T00:00:00.000Z',
...overrides,
};
}
// --- escapeXml ---
describe('escapeXml', () => {
it('escapes ampersands', () => {
expect(escapeXml('a & b')).toBe('a &amp; b');
});
it('escapes less-than', () => {
expect(escapeXml('a < b')).toBe('a &lt; b');
});
it('escapes greater-than', () => {
expect(escapeXml('a > b')).toBe('a &gt; b');
});
it('escapes double quotes', () => {
expect(escapeXml('"hello"')).toBe('&quot;hello&quot;');
});
it('handles multiple special characters together', () => {
expect(escapeXml('a & b < c > d "e"')).toBe(
'a &amp; b &lt; c &gt; d &quot;e&quot;',
);
});
it('passes through strings with no special chars', () => {
expect(escapeXml('hello world')).toBe('hello world');
});
it('handles empty string', () => {
expect(escapeXml('')).toBe('');
});
});
// --- formatMessages ---
describe('formatMessages', () => {
it('formats a single message as XML', () => {
const result = formatMessages([makeMsg()]);
expect(result).toBe(
'<messages>\n' +
'<message sender="Alice" time="2024-01-01T00:00:00.000Z">hello</message>\n' +
'</messages>',
);
});
it('formats multiple messages', () => {
const msgs = [
makeMsg({ id: '1', sender_name: 'Alice', content: 'hi', timestamp: 't1' }),
makeMsg({ id: '2', sender_name: 'Bob', content: 'hey', timestamp: 't2' }),
];
const result = formatMessages(msgs);
expect(result).toContain('sender="Alice"');
expect(result).toContain('sender="Bob"');
expect(result).toContain('>hi</message>');
expect(result).toContain('>hey</message>');
});
it('escapes special characters in sender names', () => {
const result = formatMessages([makeMsg({ sender_name: 'A & B <Co>' })]);
expect(result).toContain('sender="A &amp; B &lt;Co&gt;"');
});
it('escapes special characters in content', () => {
const result = formatMessages([
makeMsg({ content: '<script>alert("xss")</script>' }),
]);
expect(result).toContain(
'&lt;script&gt;alert(&quot;xss&quot;)&lt;/script&gt;',
);
});
it('handles empty array', () => {
const result = formatMessages([]);
expect(result).toBe('<messages>\n\n</messages>');
});
});
// --- TRIGGER_PATTERN ---
describe('TRIGGER_PATTERN', () => {
it('matches @Andy at start of message', () => {
expect(TRIGGER_PATTERN.test('@Andy hello')).toBe(true);
});
it('matches case-insensitively', () => {
expect(TRIGGER_PATTERN.test('@andy hello')).toBe(true);
expect(TRIGGER_PATTERN.test('@ANDY hello')).toBe(true);
});
it('does not match when not at start of message', () => {
expect(TRIGGER_PATTERN.test('hello @Andy')).toBe(false);
});
it('does not match partial name like @Andrew (word boundary)', () => {
expect(TRIGGER_PATTERN.test('@Andrew hello')).toBe(false);
});
it('matches with word boundary before apostrophe', () => {
expect(TRIGGER_PATTERN.test("@Andy's thing")).toBe(true);
});
it('matches @Andy alone (end of string is a word boundary)', () => {
expect(TRIGGER_PATTERN.test('@Andy')).toBe(true);
});
it('matches with leading whitespace after trim', () => {
// The actual usage trims before testing: TRIGGER_PATTERN.test(m.content.trim())
expect(TRIGGER_PATTERN.test('@Andy hey'.trim())).toBe(true);
});
});
// --- Outbound formatting (internal tag stripping + prefix) ---
describe('stripInternalTags', () => {
it('strips single-line internal tags', () => {
expect(stripInternalTags('hello <internal>secret</internal> world')).toBe(
'hello world',
);
});
it('strips multi-line internal tags', () => {
expect(
stripInternalTags('hello <internal>\nsecret\nstuff\n</internal> world'),
).toBe('hello world');
});
it('strips multiple internal tag blocks', () => {
expect(
stripInternalTags(
'<internal>a</internal>hello<internal>b</internal>',
),
).toBe('hello');
});
it('returns empty string when text is only internal tags', () => {
expect(stripInternalTags('<internal>only this</internal>')).toBe('');
});
});
describe('formatOutbound', () => {
const waChannel = { prefixAssistantName: true } as Channel;
const noPrefixChannel = { prefixAssistantName: false } as Channel;
const defaultChannel = {} as Channel;
it('prefixes with assistant name when channel wants it', () => {
expect(formatOutbound(waChannel, 'hello world')).toBe(
`${ASSISTANT_NAME}: hello world`,
);
});
it('does not prefix when channel opts out', () => {
expect(formatOutbound(noPrefixChannel, 'hello world')).toBe('hello world');
});
it('defaults to prefixing when prefixAssistantName is undefined', () => {
expect(formatOutbound(defaultChannel, 'hello world')).toBe(
`${ASSISTANT_NAME}: hello world`,
);
});
it('returns empty string when all text is internal', () => {
expect(formatOutbound(waChannel, '<internal>hidden</internal>')).toBe('');
});
it('strips internal tags and prefixes remaining text', () => {
expect(
formatOutbound(waChannel, '<internal>thinking</internal>The answer is 42'),
).toBe(`${ASSISTANT_NAME}: The answer is 42`);
});
});
// --- Trigger gating with requiresTrigger flag ---
describe('trigger gating (requiresTrigger interaction)', () => {
// Replicates the exact logic from processGroupMessages and startMessageLoop:
// if (!isMainGroup && group.requiresTrigger !== false) { check trigger }
function shouldRequireTrigger(
isMainGroup: boolean,
requiresTrigger: boolean | undefined,
): boolean {
return !isMainGroup && requiresTrigger !== false;
}
function shouldProcess(
isMainGroup: boolean,
requiresTrigger: boolean | undefined,
messages: NewMessage[],
): boolean {
if (!shouldRequireTrigger(isMainGroup, requiresTrigger)) return true;
return messages.some((m) => TRIGGER_PATTERN.test(m.content.trim()));
}
it('main group always processes (no trigger needed)', () => {
const msgs = [makeMsg({ content: 'hello no trigger' })];
expect(shouldProcess(true, undefined, msgs)).toBe(true);
});
it('main group processes even with requiresTrigger=true', () => {
const msgs = [makeMsg({ content: 'hello no trigger' })];
expect(shouldProcess(true, true, msgs)).toBe(true);
});
it('non-main group with requiresTrigger=undefined requires trigger (defaults to true)', () => {
const msgs = [makeMsg({ content: 'hello no trigger' })];
expect(shouldProcess(false, undefined, msgs)).toBe(false);
});
it('non-main group with requiresTrigger=true requires trigger', () => {
const msgs = [makeMsg({ content: 'hello no trigger' })];
expect(shouldProcess(false, true, msgs)).toBe(false);
});
it('non-main group with requiresTrigger=true processes when trigger present', () => {
const msgs = [makeMsg({ content: '@Andy do something' })];
expect(shouldProcess(false, true, msgs)).toBe(true);
});
it('non-main group with requiresTrigger=false always processes (no trigger needed)', () => {
const msgs = [makeMsg({ content: 'hello no trigger' })];
expect(shouldProcess(false, false, msgs)).toBe(true);
});
});

245
src/group-queue.test.ts Normal file
View File

@@ -0,0 +1,245 @@
import { describe, it, expect, beforeEach, vi, afterEach } from 'vitest';
import { GroupQueue } from './group-queue.js';
// Mock config to control concurrency limit
vi.mock('./config.js', () => ({
DATA_DIR: '/tmp/nanoclaw-test-data',
MAX_CONCURRENT_CONTAINERS: 2,
}));
// Mock fs operations used by sendMessage/closeStdin
vi.mock('fs', async () => {
const actual = await vi.importActual<typeof import('fs')>('fs');
return {
...actual,
default: {
...actual,
mkdirSync: vi.fn(),
writeFileSync: vi.fn(),
renameSync: vi.fn(),
},
};
});
describe('GroupQueue', () => {
let queue: GroupQueue;
beforeEach(() => {
vi.useFakeTimers();
queue = new GroupQueue();
});
afterEach(() => {
vi.useRealTimers();
});
// --- Single group at a time ---
it('only runs one container per group at a time', async () => {
let concurrentCount = 0;
let maxConcurrent = 0;
const processMessages = vi.fn(async (groupJid: string) => {
concurrentCount++;
maxConcurrent = Math.max(maxConcurrent, concurrentCount);
// Simulate async work
await new Promise((resolve) => setTimeout(resolve, 100));
concurrentCount--;
return true;
});
queue.setProcessMessagesFn(processMessages);
// Enqueue two messages for the same group
queue.enqueueMessageCheck('group1@g.us');
queue.enqueueMessageCheck('group1@g.us');
// Advance timers to let the first process complete
await vi.advanceTimersByTimeAsync(200);
// Second enqueue should have been queued, not concurrent
expect(maxConcurrent).toBe(1);
});
// --- Global concurrency limit ---
it('respects global concurrency limit', async () => {
let activeCount = 0;
let maxActive = 0;
const completionCallbacks: Array<() => void> = [];
const processMessages = vi.fn(async (groupJid: string) => {
activeCount++;
maxActive = Math.max(maxActive, activeCount);
await new Promise<void>((resolve) => completionCallbacks.push(resolve));
activeCount--;
return true;
});
queue.setProcessMessagesFn(processMessages);
// Enqueue 3 groups (limit is 2)
queue.enqueueMessageCheck('group1@g.us');
queue.enqueueMessageCheck('group2@g.us');
queue.enqueueMessageCheck('group3@g.us');
// Let promises settle
await vi.advanceTimersByTimeAsync(10);
// Only 2 should be active (MAX_CONCURRENT_CONTAINERS = 2)
expect(maxActive).toBe(2);
expect(activeCount).toBe(2);
// Complete one — third should start
completionCallbacks[0]();
await vi.advanceTimersByTimeAsync(10);
expect(processMessages).toHaveBeenCalledTimes(3);
});
// --- Tasks prioritized over messages ---
it('drains tasks before messages for same group', async () => {
const executionOrder: string[] = [];
let resolveFirst: () => void;
const processMessages = vi.fn(async (groupJid: string) => {
if (executionOrder.length === 0) {
// First call: block until we release it
await new Promise<void>((resolve) => {
resolveFirst = resolve;
});
}
executionOrder.push('messages');
return true;
});
queue.setProcessMessagesFn(processMessages);
// Start processing messages (takes the active slot)
queue.enqueueMessageCheck('group1@g.us');
await vi.advanceTimersByTimeAsync(10);
// While active, enqueue both a task and pending messages
const taskFn = vi.fn(async () => {
executionOrder.push('task');
});
queue.enqueueTask('group1@g.us', 'task-1', taskFn);
queue.enqueueMessageCheck('group1@g.us');
// Release the first processing
resolveFirst!();
await vi.advanceTimersByTimeAsync(10);
// Task should have run before the second message check
expect(executionOrder[0]).toBe('messages'); // first call
expect(executionOrder[1]).toBe('task'); // task runs first in drain
// Messages would run after task completes
});
// --- Retry with backoff on failure ---
it('retries with exponential backoff on failure', async () => {
let callCount = 0;
const processMessages = vi.fn(async () => {
callCount++;
return false; // failure
});
queue.setProcessMessagesFn(processMessages);
queue.enqueueMessageCheck('group1@g.us');
// First call happens immediately
await vi.advanceTimersByTimeAsync(10);
expect(callCount).toBe(1);
// First retry after 5000ms (BASE_RETRY_MS * 2^0)
await vi.advanceTimersByTimeAsync(5000);
await vi.advanceTimersByTimeAsync(10);
expect(callCount).toBe(2);
// Second retry after 10000ms (BASE_RETRY_MS * 2^1)
await vi.advanceTimersByTimeAsync(10000);
await vi.advanceTimersByTimeAsync(10);
expect(callCount).toBe(3);
});
// --- Shutdown prevents new enqueues ---
it('prevents new enqueues after shutdown', async () => {
const processMessages = vi.fn(async () => true);
queue.setProcessMessagesFn(processMessages);
await queue.shutdown(1000);
queue.enqueueMessageCheck('group1@g.us');
await vi.advanceTimersByTimeAsync(100);
expect(processMessages).not.toHaveBeenCalled();
});
// --- Max retries exceeded ---
it('stops retrying after MAX_RETRIES and resets', async () => {
let callCount = 0;
const processMessages = vi.fn(async () => {
callCount++;
return false; // always fail
});
queue.setProcessMessagesFn(processMessages);
queue.enqueueMessageCheck('group1@g.us');
// Run through all 5 retries (MAX_RETRIES = 5)
// Initial call
await vi.advanceTimersByTimeAsync(10);
expect(callCount).toBe(1);
// Retry 1: 5000ms, Retry 2: 10000ms, Retry 3: 20000ms, Retry 4: 40000ms, Retry 5: 80000ms
const retryDelays = [5000, 10000, 20000, 40000, 80000];
for (let i = 0; i < retryDelays.length; i++) {
await vi.advanceTimersByTimeAsync(retryDelays[i] + 10);
expect(callCount).toBe(i + 2);
}
// After 5 retries (6 total calls), should stop — no more retries
const countAfterMaxRetries = callCount;
await vi.advanceTimersByTimeAsync(200000); // Wait a long time
expect(callCount).toBe(countAfterMaxRetries);
});
// --- Waiting groups get drained when slots free up ---
it('drains waiting groups when active slots free up', async () => {
const processed: string[] = [];
const completionCallbacks: Array<() => void> = [];
const processMessages = vi.fn(async (groupJid: string) => {
processed.push(groupJid);
await new Promise<void>((resolve) => completionCallbacks.push(resolve));
return true;
});
queue.setProcessMessagesFn(processMessages);
// Fill both slots
queue.enqueueMessageCheck('group1@g.us');
queue.enqueueMessageCheck('group2@g.us');
await vi.advanceTimersByTimeAsync(10);
// Queue a third
queue.enqueueMessageCheck('group3@g.us');
await vi.advanceTimersByTimeAsync(10);
expect(processed).toEqual(['group1@g.us', 'group2@g.us']);
// Free up a slot
completionCallbacks[0]();
await vi.advanceTimersByTimeAsync(10);
expect(processed).toContain('group3@g.us');
});
});

View File

@@ -1,104 +1,57 @@
import { exec, execSync } from 'child_process';
import { execSync } from 'child_process';
import fs from 'fs';
import path from 'path';
import makeWASocket, {
DisconnectReason,
WASocket,
makeCacheableSignalKeyStore,
useMultiFileAuthState,
} from '@whiskeysockets/baileys';
import { CronExpressionParser } from 'cron-parser';
import {
ASSISTANT_NAME,
DATA_DIR,
IDLE_TIMEOUT,
IPC_POLL_INTERVAL,
MAIN_GROUP_FOLDER,
POLL_INTERVAL,
STORE_DIR,
TIMEZONE,
TRIGGER_PATTERN,
} from './config.js';
import { WhatsAppChannel } from './channels/whatsapp.js';
import {
AvailableGroup,
ContainerOutput,
runContainerAgent,
writeGroupsSnapshot,
writeTasksSnapshot,
} from './container-runner.js';
import {
createTask,
deleteTask,
getAllChats,
getAllRegisteredGroups,
getAllSessions,
getAllTasks,
getLastGroupSync,
getMessagesSince,
getNewMessages,
getRouterState,
getTaskById,
initDatabase,
setLastGroupSync,
setRegisteredGroup,
setRouterState,
setSession,
storeChatMetadata,
storeMessage,
updateChatName,
updateTask,
} from './db.js';
import { GroupQueue } from './group-queue.js';
import { startIpcWatcher } from './ipc.js';
import { formatMessages, formatOutbound } from './router.js';
import { startSchedulerLoop } from './task-scheduler.js';
import { NewMessage, RegisteredGroup } from './types.js';
import { logger } from './logger.js';
const GROUP_SYNC_INTERVAL_MS = 24 * 60 * 60 * 1000; // 24 hours
// Re-export for backwards compatibility during refactor
export { escapeXml, formatMessages } from './router.js';
let sock: WASocket;
let lastTimestamp = '';
let sessions: Record<string, string> = {};
let registeredGroups: Record<string, RegisteredGroup> = {};
let lastAgentTimestamp: Record<string, string> = {};
// LID to phone number mapping (WhatsApp now sends LID JIDs for self-chats)
let lidToPhoneMap: Record<string, string> = {};
// Guards to prevent duplicate loops on WhatsApp reconnect
let messageLoopRunning = false;
let ipcWatcherRunning = false;
let groupSyncTimerStarted = false;
// WhatsApp connection state and outgoing message queue
let waConnected = false;
const outgoingQueue: Array<{ jid: string; text: string }> = [];
let whatsapp: WhatsAppChannel;
const queue = new GroupQueue();
/**
* Translate a JID from LID format to phone format if we have a mapping.
* Returns the original JID if no mapping exists.
*/
function translateJid(jid: string): string {
if (!jid.endsWith('@lid')) return jid;
const lidUser = jid.split('@')[0].split(':')[0];
const phoneJid = lidToPhoneMap[lidUser];
if (phoneJid) {
logger.debug({ lidJid: jid, phoneJid }, 'Translated LID to phone JID');
return phoneJid;
}
return jid;
}
async function setTyping(jid: string, isTyping: boolean): Promise<void> {
try {
await sock.sendPresenceUpdate(isTyping ? 'composing' : 'paused', jid);
} catch (err) {
logger.debug({ jid, err }, 'Failed to update typing status');
}
}
function loadState(): void {
// Load from SQLite (migration from JSON happens in initDatabase)
lastTimestamp = getRouterState('last_timestamp') || '';
const agentTs = getRouterState('last_agent_timestamp');
try {
@@ -137,49 +90,11 @@ function registerGroup(jid: string, group: RegisteredGroup): void {
);
}
/**
* Sync group metadata from WhatsApp.
* Fetches all participating groups and stores their names in the database.
* Called on startup, daily, and on-demand via IPC.
*/
async function syncGroupMetadata(force = false): Promise<void> {
// Check if we need to sync (skip if synced recently, unless forced)
if (!force) {
const lastSync = getLastGroupSync();
if (lastSync) {
const lastSyncTime = new Date(lastSync).getTime();
const now = Date.now();
if (now - lastSyncTime < GROUP_SYNC_INTERVAL_MS) {
logger.debug({ lastSync }, 'Skipping group sync - synced recently');
return;
}
}
}
try {
logger.info('Syncing group metadata from WhatsApp...');
const groups = await sock.groupFetchAllParticipating();
let count = 0;
for (const [jid, metadata] of Object.entries(groups)) {
if (metadata.subject) {
updateChatName(jid, metadata.subject);
count++;
}
}
setLastGroupSync();
logger.info({ count }, 'Group metadata synced');
} catch (err) {
logger.error({ err }, 'Failed to sync group metadata');
}
}
/**
* Get available groups list for the agent.
* Returns groups ordered by most recent activity.
*/
function getAvailableGroups(): AvailableGroup[] {
export function getAvailableGroups(): import('./container-runner.js').AvailableGroup[] {
const chats = getAllChats();
const registeredJids = new Set(Object.keys(registeredGroups));
@@ -193,28 +108,14 @@ function getAvailableGroups(): AvailableGroup[] {
}));
}
function escapeXml(s: string): string {
return s
.replace(/&/g, '&amp;')
.replace(/</g, '&lt;')
.replace(/>/g, '&gt;')
.replace(/"/g, '&quot;');
}
function formatMessages(messages: NewMessage[]): string {
const lines = messages.map((m) =>
`<message sender="${escapeXml(m.sender_name)}" time="${m.timestamp}">${escapeXml(m.content)}</message>`,
);
return `<messages>\n${lines.join('\n')}\n</messages>`;
/** @internal - exported for testing */
export function _setRegisteredGroups(groups: Record<string, RegisteredGroup>): void {
registeredGroups = groups;
}
/**
* Process all pending messages for a group.
* Called by the GroupQueue when it's this group's turn.
*
* Uses streaming output: agent results are sent to WhatsApp as they arrive.
* The container stays alive for IDLE_TIMEOUT after each result, allowing
* rapid-fire messages to be piped in without spawning a new container.
*/
async function processGroupMessages(chatJid: string): Promise<boolean> {
const group = registeredGroups[chatJid];
@@ -222,7 +123,6 @@ async function processGroupMessages(chatJid: string): Promise<boolean> {
const isMainGroup = group.folder === MAIN_GROUP_FOLDER;
// Get all messages since last agent interaction
const sinceTimestamp = lastAgentTimestamp[chatJid] || '';
const missedMessages = getMessagesSince(
chatJid,
@@ -265,7 +165,7 @@ async function processGroupMessages(chatJid: string): Promise<boolean> {
}, IDLE_TIMEOUT);
};
await setTyping(chatJid, true);
await whatsapp.setTyping(chatJid, true);
let hadError = false;
const output = await runAgent(group, prompt, chatJid, async (result) => {
@@ -276,7 +176,7 @@ async function processGroupMessages(chatJid: string): Promise<boolean> {
const text = raw.replace(/<internal>[\s\S]*?<\/internal>/g, '').trim();
logger.info({ group: group.name }, `Agent output: ${raw.slice(0, 200)}`);
if (text) {
await sendMessage(chatJid, `${ASSISTANT_NAME}: ${text}`);
await whatsapp.sendMessage(chatJid, `${ASSISTANT_NAME}: ${text}`);
}
// Only reset idle timer on actual results, not session-update markers (result: null)
resetIdleTimer();
@@ -287,7 +187,7 @@ async function processGroupMessages(chatJid: string): Promise<boolean> {
}
});
await setTyping(chatJid, false);
await whatsapp.setTyping(chatJid, false);
if (idleTimer) clearTimeout(idleTimer);
if (output === 'error' || hadError) {
@@ -380,511 +280,6 @@ async function runAgent(
}
}
async function sendMessage(jid: string, text: string): Promise<void> {
if (!waConnected) {
outgoingQueue.push({ jid, text });
logger.info({ jid, length: text.length, queueSize: outgoingQueue.length }, 'WA disconnected, message queued');
return;
}
try {
await sock.sendMessage(jid, { text });
logger.info({ jid, length: text.length }, 'Message sent');
} catch (err) {
// If send fails, queue it for retry on reconnect
outgoingQueue.push({ jid, text });
logger.warn({ jid, err, queueSize: outgoingQueue.length }, 'Failed to send, message queued');
}
}
let flushing = false;
async function flushOutgoingQueue(): Promise<void> {
if (flushing || outgoingQueue.length === 0) return;
flushing = true;
try {
logger.info({ count: outgoingQueue.length }, 'Flushing outgoing message queue');
// Process one at a time — sendMessage re-queues on failure internally.
// Shift instead of splice so unattempted messages stay in the queue
// if an unexpected error occurs.
while (outgoingQueue.length > 0) {
const item = outgoingQueue.shift()!;
await sendMessage(item.jid, item.text);
}
} finally {
flushing = false;
}
}
function startIpcWatcher(): void {
if (ipcWatcherRunning) {
logger.debug('IPC watcher already running, skipping duplicate start');
return;
}
ipcWatcherRunning = true;
const ipcBaseDir = path.join(DATA_DIR, 'ipc');
fs.mkdirSync(ipcBaseDir, { recursive: true });
const processIpcFiles = async () => {
// Scan all group IPC directories (identity determined by directory)
let groupFolders: string[];
try {
groupFolders = fs.readdirSync(ipcBaseDir).filter((f) => {
const stat = fs.statSync(path.join(ipcBaseDir, f));
return stat.isDirectory() && f !== 'errors';
});
} catch (err) {
logger.error({ err }, 'Error reading IPC base directory');
setTimeout(processIpcFiles, IPC_POLL_INTERVAL);
return;
}
for (const sourceGroup of groupFolders) {
const isMain = sourceGroup === MAIN_GROUP_FOLDER;
const messagesDir = path.join(ipcBaseDir, sourceGroup, 'messages');
const tasksDir = path.join(ipcBaseDir, sourceGroup, 'tasks');
// Process messages from this group's IPC directory
try {
if (fs.existsSync(messagesDir)) {
const messageFiles = fs
.readdirSync(messagesDir)
.filter((f) => f.endsWith('.json'));
for (const file of messageFiles) {
const filePath = path.join(messagesDir, file);
try {
const data = JSON.parse(fs.readFileSync(filePath, 'utf-8'));
if (data.type === 'message' && data.chatJid && data.text) {
// Authorization: verify this group can send to this chatJid
const targetGroup = registeredGroups[data.chatJid];
if (
isMain ||
(targetGroup && targetGroup.folder === sourceGroup)
) {
await sendMessage(
data.chatJid,
`${ASSISTANT_NAME}: ${data.text}`,
);
logger.info(
{ chatJid: data.chatJid, sourceGroup },
'IPC message sent',
);
} else {
logger.warn(
{ chatJid: data.chatJid, sourceGroup },
'Unauthorized IPC message attempt blocked',
);
}
}
fs.unlinkSync(filePath);
} catch (err) {
logger.error(
{ file, sourceGroup, err },
'Error processing IPC message',
);
const errorDir = path.join(ipcBaseDir, 'errors');
fs.mkdirSync(errorDir, { recursive: true });
fs.renameSync(
filePath,
path.join(errorDir, `${sourceGroup}-${file}`),
);
}
}
}
} catch (err) {
logger.error(
{ err, sourceGroup },
'Error reading IPC messages directory',
);
}
// Process tasks from this group's IPC directory
try {
if (fs.existsSync(tasksDir)) {
const taskFiles = fs
.readdirSync(tasksDir)
.filter((f) => f.endsWith('.json'));
for (const file of taskFiles) {
const filePath = path.join(tasksDir, file);
try {
const data = JSON.parse(fs.readFileSync(filePath, 'utf-8'));
// Pass source group identity to processTaskIpc for authorization
await processTaskIpc(data, sourceGroup, isMain);
fs.unlinkSync(filePath);
} catch (err) {
logger.error(
{ file, sourceGroup, err },
'Error processing IPC task',
);
const errorDir = path.join(ipcBaseDir, 'errors');
fs.mkdirSync(errorDir, { recursive: true });
fs.renameSync(
filePath,
path.join(errorDir, `${sourceGroup}-${file}`),
);
}
}
}
} catch (err) {
logger.error({ err, sourceGroup }, 'Error reading IPC tasks directory');
}
}
setTimeout(processIpcFiles, IPC_POLL_INTERVAL);
};
processIpcFiles();
logger.info('IPC watcher started (per-group namespaces)');
}
async function processTaskIpc(
data: {
type: string;
taskId?: string;
prompt?: string;
schedule_type?: string;
schedule_value?: string;
context_mode?: string;
groupFolder?: string;
chatJid?: string;
targetJid?: string;
// For register_group
jid?: string;
name?: string;
folder?: string;
trigger?: string;
containerConfig?: RegisteredGroup['containerConfig'];
},
sourceGroup: string, // Verified identity from IPC directory
isMain: boolean, // Verified from directory path
): Promise<void> {
switch (data.type) {
case 'schedule_task':
if (
data.prompt &&
data.schedule_type &&
data.schedule_value &&
data.targetJid
) {
// Resolve the target group from JID
const targetJid = data.targetJid as string;
const targetGroupEntry = registeredGroups[targetJid];
if (!targetGroupEntry) {
logger.warn(
{ targetJid },
'Cannot schedule task: target group not registered',
);
break;
}
const targetFolder = targetGroupEntry.folder;
// Authorization: non-main groups can only schedule for themselves
if (!isMain && targetFolder !== sourceGroup) {
logger.warn(
{ sourceGroup, targetFolder },
'Unauthorized schedule_task attempt blocked',
);
break;
}
const scheduleType = data.schedule_type as 'cron' | 'interval' | 'once';
let nextRun: string | null = null;
if (scheduleType === 'cron') {
try {
const interval = CronExpressionParser.parse(data.schedule_value, {
tz: TIMEZONE,
});
nextRun = interval.next().toISOString();
} catch {
logger.warn(
{ scheduleValue: data.schedule_value },
'Invalid cron expression',
);
break;
}
} else if (scheduleType === 'interval') {
const ms = parseInt(data.schedule_value, 10);
if (isNaN(ms) || ms <= 0) {
logger.warn(
{ scheduleValue: data.schedule_value },
'Invalid interval',
);
break;
}
nextRun = new Date(Date.now() + ms).toISOString();
} else if (scheduleType === 'once') {
const scheduled = new Date(data.schedule_value);
if (isNaN(scheduled.getTime())) {
logger.warn(
{ scheduleValue: data.schedule_value },
'Invalid timestamp',
);
break;
}
nextRun = scheduled.toISOString();
}
const taskId = `task-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
const contextMode =
data.context_mode === 'group' || data.context_mode === 'isolated'
? data.context_mode
: 'isolated';
createTask({
id: taskId,
group_folder: targetFolder,
chat_jid: targetJid,
prompt: data.prompt,
schedule_type: scheduleType,
schedule_value: data.schedule_value,
context_mode: contextMode,
next_run: nextRun,
status: 'active',
created_at: new Date().toISOString(),
});
logger.info(
{ taskId, sourceGroup, targetFolder, contextMode },
'Task created via IPC',
);
}
break;
case 'pause_task':
if (data.taskId) {
const task = getTaskById(data.taskId);
if (task && (isMain || task.group_folder === sourceGroup)) {
updateTask(data.taskId, { status: 'paused' });
logger.info(
{ taskId: data.taskId, sourceGroup },
'Task paused via IPC',
);
} else {
logger.warn(
{ taskId: data.taskId, sourceGroup },
'Unauthorized task pause attempt',
);
}
}
break;
case 'resume_task':
if (data.taskId) {
const task = getTaskById(data.taskId);
if (task && (isMain || task.group_folder === sourceGroup)) {
updateTask(data.taskId, { status: 'active' });
logger.info(
{ taskId: data.taskId, sourceGroup },
'Task resumed via IPC',
);
} else {
logger.warn(
{ taskId: data.taskId, sourceGroup },
'Unauthorized task resume attempt',
);
}
}
break;
case 'cancel_task':
if (data.taskId) {
const task = getTaskById(data.taskId);
if (task && (isMain || task.group_folder === sourceGroup)) {
deleteTask(data.taskId);
logger.info(
{ taskId: data.taskId, sourceGroup },
'Task cancelled via IPC',
);
} else {
logger.warn(
{ taskId: data.taskId, sourceGroup },
'Unauthorized task cancel attempt',
);
}
}
break;
case 'refresh_groups':
// Only main group can request a refresh
if (isMain) {
logger.info(
{ sourceGroup },
'Group metadata refresh requested via IPC',
);
await syncGroupMetadata(true);
// Write updated snapshot immediately
const availableGroups = getAvailableGroups();
writeGroupsSnapshot(
sourceGroup,
true,
availableGroups,
new Set(Object.keys(registeredGroups)),
);
} else {
logger.warn(
{ sourceGroup },
'Unauthorized refresh_groups attempt blocked',
);
}
break;
case 'register_group':
// Only main group can register new groups
if (!isMain) {
logger.warn(
{ sourceGroup },
'Unauthorized register_group attempt blocked',
);
break;
}
if (data.jid && data.name && data.folder && data.trigger) {
registerGroup(data.jid, {
name: data.name,
folder: data.folder,
trigger: data.trigger,
added_at: new Date().toISOString(),
containerConfig: data.containerConfig,
});
} else {
logger.warn(
{ data },
'Invalid register_group request - missing required fields',
);
}
break;
default:
logger.warn({ type: data.type }, 'Unknown IPC task type');
}
}
async function connectWhatsApp(): Promise<void> {
const authDir = path.join(STORE_DIR, 'auth');
fs.mkdirSync(authDir, { recursive: true });
const { state, saveCreds } = await useMultiFileAuthState(authDir);
sock = makeWASocket({
auth: {
creds: state.creds,
keys: makeCacheableSignalKeyStore(state.keys, logger),
},
printQRInTerminal: false,
logger,
browser: ['NanoClaw', 'Chrome', '1.0.0'],
});
sock.ev.on('connection.update', (update) => {
const { connection, lastDisconnect, qr } = update;
if (qr) {
const msg =
'WhatsApp authentication required. Run /setup in Claude Code.';
logger.error(msg);
exec(
`osascript -e 'display notification "${msg}" with title "NanoClaw" sound name "Basso"'`,
);
setTimeout(() => process.exit(1), 1000);
}
if (connection === 'close') {
waConnected = false;
const reason = (lastDisconnect?.error as any)?.output?.statusCode;
const shouldReconnect = reason !== DisconnectReason.loggedOut;
logger.info({ reason, shouldReconnect, queuedMessages: outgoingQueue.length }, 'Connection closed');
if (shouldReconnect) {
logger.info('Reconnecting...');
connectWhatsApp().catch((err) => {
logger.error({ err }, 'Failed to reconnect, retrying in 5s');
setTimeout(() => {
connectWhatsApp().catch((err2) => {
logger.error({ err: err2 }, 'Reconnection retry failed');
});
}, 5000);
});
} else {
logger.info('Logged out. Run /setup to re-authenticate.');
process.exit(0);
}
} else if (connection === 'open') {
waConnected = true;
logger.info('Connected to WhatsApp');
// Build LID to phone mapping from auth state for self-chat translation
if (sock.user) {
const phoneUser = sock.user.id.split(':')[0];
const lidUser = sock.user.lid?.split(':')[0];
if (lidUser && phoneUser) {
lidToPhoneMap[lidUser] = `${phoneUser}@s.whatsapp.net`;
logger.debug({ lidUser, phoneUser }, 'LID to phone mapping set');
}
}
// Flush any messages queued while disconnected
flushOutgoingQueue().catch((err) =>
logger.error({ err }, 'Failed to flush outgoing queue'),
);
// Sync group metadata on startup (respects 24h cache)
syncGroupMetadata().catch((err) =>
logger.error({ err }, 'Initial group sync failed'),
);
// Set up daily sync timer (only once)
if (!groupSyncTimerStarted) {
groupSyncTimerStarted = true;
setInterval(() => {
syncGroupMetadata().catch((err) =>
logger.error({ err }, 'Periodic group sync failed'),
);
}, GROUP_SYNC_INTERVAL_MS);
}
startSchedulerLoop({
registeredGroups: () => registeredGroups,
getSessions: () => sessions,
queue,
onProcess: (groupJid, proc, containerName, groupFolder) => queue.registerProcess(groupJid, proc, containerName, groupFolder),
sendMessage,
assistantName: ASSISTANT_NAME,
});
startIpcWatcher();
queue.setProcessMessagesFn(processGroupMessages);
recoverPendingMessages();
startMessageLoop();
}
});
sock.ev.on('creds.update', saveCreds);
sock.ev.on('messages.upsert', ({ messages }) => {
for (const msg of messages) {
if (!msg.message) continue;
const rawJid = msg.key.remoteJid;
if (!rawJid || rawJid === 'status@broadcast') continue;
// Translate LID JID to phone JID if applicable
const chatJid = translateJid(rawJid);
const timestamp = new Date(
Number(msg.messageTimestamp) * 1000,
).toISOString();
// Always store chat metadata for group discovery
storeChatMetadata(chatJid, timestamp);
// Only store full message content for registered groups
if (registeredGroups[chatJid]) {
storeMessage(
msg,
chatJid,
msg.key.fromMe || false,
msg.pushName || undefined,
);
}
}
});
}
async function startMessageLoop(): Promise<void> {
if (messageLoopRunning) {
logger.debug('Message loop already running, skipping duplicate start');
@@ -1060,15 +455,54 @@ async function main(): Promise<void> {
const shutdown = async (signal: string) => {
logger.info({ signal }, 'Shutdown signal received');
await queue.shutdown(10000);
await whatsapp.disconnect();
process.exit(0);
};
process.on('SIGTERM', () => shutdown('SIGTERM'));
process.on('SIGINT', () => shutdown('SIGINT'));
await connectWhatsApp();
// Create WhatsApp channel
whatsapp = new WhatsAppChannel({
onMessage: (chatJid, msg) => storeMessage(msg),
onChatMetadata: (chatJid, timestamp) => storeChatMetadata(chatJid, timestamp),
registeredGroups: () => registeredGroups,
});
// Connect — resolves when first connected
await whatsapp.connect();
// Start subsystems (independently of connection handler)
startSchedulerLoop({
registeredGroups: () => registeredGroups,
getSessions: () => sessions,
queue,
onProcess: (groupJid, proc, containerName, groupFolder) => queue.registerProcess(groupJid, proc, containerName, groupFolder),
sendMessage: async (jid, rawText) => {
const text = formatOutbound(whatsapp, rawText);
if (text) await whatsapp.sendMessage(jid, text);
},
});
startIpcWatcher({
sendMessage: (jid, text) => whatsapp.sendMessage(jid, text),
registeredGroups: () => registeredGroups,
registerGroup,
syncGroupMetadata: (force) => whatsapp.syncGroupMetadata(force),
getAvailableGroups,
writeGroupsSnapshot: (gf, im, ag, rj) => writeGroupsSnapshot(gf, im, ag, rj),
});
queue.setProcessMessagesFn(processGroupMessages);
recoverPendingMessages();
startMessageLoop();
}
main().catch((err) => {
logger.error({ err }, 'Failed to start NanoClaw');
process.exit(1);
});
// Guard: only run when executed directly, not when imported by tests
const isDirectRun =
process.argv[1] &&
new URL(import.meta.url).pathname === new URL(`file://${process.argv[1]}`).pathname;
if (isDirectRun) {
main().catch((err) => {
logger.error({ err }, 'Failed to start NanoClaw');
process.exit(1);
});
}

594
src/ipc-auth.test.ts Normal file
View File

@@ -0,0 +1,594 @@
import { describe, it, expect, beforeEach } from 'vitest';
import {
_initTestDatabase,
createTask,
getAllTasks,
getRegisteredGroup,
getTaskById,
setRegisteredGroup,
} from './db.js';
import { processTaskIpc, IpcDeps } from './ipc.js';
import { RegisteredGroup } from './types.js';
// Set up registered groups used across tests
const MAIN_GROUP: RegisteredGroup = {
name: 'Main',
folder: 'main',
trigger: 'always',
added_at: '2024-01-01T00:00:00.000Z',
};
const OTHER_GROUP: RegisteredGroup = {
name: 'Other',
folder: 'other-group',
trigger: '@Andy',
added_at: '2024-01-01T00:00:00.000Z',
};
const THIRD_GROUP: RegisteredGroup = {
name: 'Third',
folder: 'third-group',
trigger: '@Andy',
added_at: '2024-01-01T00:00:00.000Z',
};
let groups: Record<string, RegisteredGroup>;
let deps: IpcDeps;
beforeEach(() => {
_initTestDatabase();
groups = {
'main@g.us': MAIN_GROUP,
'other@g.us': OTHER_GROUP,
'third@g.us': THIRD_GROUP,
};
// Populate DB as well
setRegisteredGroup('main@g.us', MAIN_GROUP);
setRegisteredGroup('other@g.us', OTHER_GROUP);
setRegisteredGroup('third@g.us', THIRD_GROUP);
deps = {
sendMessage: async () => {},
registeredGroups: () => groups,
registerGroup: (jid, group) => {
groups[jid] = group;
setRegisteredGroup(jid, group);
// Mock the fs.mkdirSync that registerGroup does
},
syncGroupMetadata: async () => {},
getAvailableGroups: () => [],
writeGroupsSnapshot: () => {},
};
});
// --- schedule_task authorization ---
describe('schedule_task authorization', () => {
it('main group can schedule for another group', async () => {
await processTaskIpc(
{
type: 'schedule_task',
prompt: 'do something',
schedule_type: 'once',
schedule_value: '2025-06-01T00:00:00.000Z',
targetJid: 'other@g.us',
},
'main',
true,
deps,
);
// Verify task was created in DB for the other group
const allTasks = getAllTasks();
expect(allTasks.length).toBe(1);
expect(allTasks[0].group_folder).toBe('other-group');
});
it('non-main group can schedule for itself', async () => {
await processTaskIpc(
{
type: 'schedule_task',
prompt: 'self task',
schedule_type: 'once',
schedule_value: '2025-06-01T00:00:00.000Z',
targetJid: 'other@g.us',
},
'other-group',
false,
deps,
);
const allTasks = getAllTasks();
expect(allTasks.length).toBe(1);
expect(allTasks[0].group_folder).toBe('other-group');
});
it('non-main group cannot schedule for another group', async () => {
await processTaskIpc(
{
type: 'schedule_task',
prompt: 'unauthorized',
schedule_type: 'once',
schedule_value: '2025-06-01T00:00:00.000Z',
targetJid: 'main@g.us',
},
'other-group',
false,
deps,
);
const allTasks = getAllTasks();
expect(allTasks.length).toBe(0);
});
it('rejects schedule_task for unregistered target JID', async () => {
await processTaskIpc(
{
type: 'schedule_task',
prompt: 'no target',
schedule_type: 'once',
schedule_value: '2025-06-01T00:00:00.000Z',
targetJid: 'unknown@g.us',
},
'main',
true,
deps,
);
const allTasks = getAllTasks();
expect(allTasks.length).toBe(0);
});
});
// --- pause_task authorization ---
describe('pause_task authorization', () => {
beforeEach(() => {
createTask({
id: 'task-main',
group_folder: 'main',
chat_jid: 'main@g.us',
prompt: 'main task',
schedule_type: 'once',
schedule_value: '2025-06-01T00:00:00.000Z',
context_mode: 'isolated',
next_run: '2025-06-01T00:00:00.000Z',
status: 'active',
created_at: '2024-01-01T00:00:00.000Z',
});
createTask({
id: 'task-other',
group_folder: 'other-group',
chat_jid: 'other@g.us',
prompt: 'other task',
schedule_type: 'once',
schedule_value: '2025-06-01T00:00:00.000Z',
context_mode: 'isolated',
next_run: '2025-06-01T00:00:00.000Z',
status: 'active',
created_at: '2024-01-01T00:00:00.000Z',
});
});
it('main group can pause any task', async () => {
await processTaskIpc({ type: 'pause_task', taskId: 'task-other' }, 'main', true, deps);
expect(getTaskById('task-other')!.status).toBe('paused');
});
it('non-main group can pause its own task', async () => {
await processTaskIpc({ type: 'pause_task', taskId: 'task-other' }, 'other-group', false, deps);
expect(getTaskById('task-other')!.status).toBe('paused');
});
it('non-main group cannot pause another groups task', async () => {
await processTaskIpc({ type: 'pause_task', taskId: 'task-main' }, 'other-group', false, deps);
expect(getTaskById('task-main')!.status).toBe('active');
});
});
// --- resume_task authorization ---
describe('resume_task authorization', () => {
beforeEach(() => {
createTask({
id: 'task-paused',
group_folder: 'other-group',
chat_jid: 'other@g.us',
prompt: 'paused task',
schedule_type: 'once',
schedule_value: '2025-06-01T00:00:00.000Z',
context_mode: 'isolated',
next_run: '2025-06-01T00:00:00.000Z',
status: 'paused',
created_at: '2024-01-01T00:00:00.000Z',
});
});
it('main group can resume any task', async () => {
await processTaskIpc({ type: 'resume_task', taskId: 'task-paused' }, 'main', true, deps);
expect(getTaskById('task-paused')!.status).toBe('active');
});
it('non-main group can resume its own task', async () => {
await processTaskIpc({ type: 'resume_task', taskId: 'task-paused' }, 'other-group', false, deps);
expect(getTaskById('task-paused')!.status).toBe('active');
});
it('non-main group cannot resume another groups task', async () => {
await processTaskIpc({ type: 'resume_task', taskId: 'task-paused' }, 'third-group', false, deps);
expect(getTaskById('task-paused')!.status).toBe('paused');
});
});
// --- cancel_task authorization ---
describe('cancel_task authorization', () => {
it('main group can cancel any task', async () => {
createTask({
id: 'task-to-cancel',
group_folder: 'other-group',
chat_jid: 'other@g.us',
prompt: 'cancel me',
schedule_type: 'once',
schedule_value: '2025-06-01T00:00:00.000Z',
context_mode: 'isolated',
next_run: null,
status: 'active',
created_at: '2024-01-01T00:00:00.000Z',
});
await processTaskIpc({ type: 'cancel_task', taskId: 'task-to-cancel' }, 'main', true, deps);
expect(getTaskById('task-to-cancel')).toBeUndefined();
});
it('non-main group can cancel its own task', async () => {
createTask({
id: 'task-own',
group_folder: 'other-group',
chat_jid: 'other@g.us',
prompt: 'my task',
schedule_type: 'once',
schedule_value: '2025-06-01T00:00:00.000Z',
context_mode: 'isolated',
next_run: null,
status: 'active',
created_at: '2024-01-01T00:00:00.000Z',
});
await processTaskIpc({ type: 'cancel_task', taskId: 'task-own' }, 'other-group', false, deps);
expect(getTaskById('task-own')).toBeUndefined();
});
it('non-main group cannot cancel another groups task', async () => {
createTask({
id: 'task-foreign',
group_folder: 'main',
chat_jid: 'main@g.us',
prompt: 'not yours',
schedule_type: 'once',
schedule_value: '2025-06-01T00:00:00.000Z',
context_mode: 'isolated',
next_run: null,
status: 'active',
created_at: '2024-01-01T00:00:00.000Z',
});
await processTaskIpc({ type: 'cancel_task', taskId: 'task-foreign' }, 'other-group', false, deps);
expect(getTaskById('task-foreign')).toBeDefined();
});
});
// --- register_group authorization ---
describe('register_group authorization', () => {
it('non-main group cannot register a group', async () => {
await processTaskIpc(
{
type: 'register_group',
jid: 'new@g.us',
name: 'New Group',
folder: 'new-group',
trigger: '@Andy',
},
'other-group',
false,
deps,
);
// registeredGroups should not have changed
expect(groups['new@g.us']).toBeUndefined();
});
});
// --- refresh_groups authorization ---
describe('refresh_groups authorization', () => {
it('non-main group cannot trigger refresh', async () => {
// This should be silently blocked (no crash, no effect)
await processTaskIpc({ type: 'refresh_groups' }, 'other-group', false, deps);
// If we got here without error, the auth gate worked
});
});
// --- IPC message authorization ---
// Tests the authorization pattern from startIpcWatcher (ipc.ts).
// The logic: isMain || (targetGroup && targetGroup.folder === sourceGroup)
describe('IPC message authorization', () => {
// Replicate the exact check from the IPC watcher
function isMessageAuthorized(
sourceGroup: string,
isMain: boolean,
targetChatJid: string,
registeredGroups: Record<string, RegisteredGroup>,
): boolean {
const targetGroup = registeredGroups[targetChatJid];
return isMain || (!!targetGroup && targetGroup.folder === sourceGroup);
}
it('main group can send to any group', () => {
expect(isMessageAuthorized('main', true, 'other@g.us', groups)).toBe(true);
expect(isMessageAuthorized('main', true, 'third@g.us', groups)).toBe(true);
});
it('non-main group can send to its own chat', () => {
expect(isMessageAuthorized('other-group', false, 'other@g.us', groups)).toBe(true);
});
it('non-main group cannot send to another groups chat', () => {
expect(isMessageAuthorized('other-group', false, 'main@g.us', groups)).toBe(false);
expect(isMessageAuthorized('other-group', false, 'third@g.us', groups)).toBe(false);
});
it('non-main group cannot send to unregistered JID', () => {
expect(isMessageAuthorized('other-group', false, 'unknown@g.us', groups)).toBe(false);
});
it('main group can send to unregistered JID', () => {
// Main is always authorized regardless of target
expect(isMessageAuthorized('main', true, 'unknown@g.us', groups)).toBe(true);
});
});
// --- schedule_task with cron and interval types ---
describe('schedule_task schedule types', () => {
it('creates task with cron schedule and computes next_run', async () => {
await processTaskIpc(
{
type: 'schedule_task',
prompt: 'cron task',
schedule_type: 'cron',
schedule_value: '0 9 * * *', // every day at 9am
targetJid: 'other@g.us',
},
'main',
true,
deps,
);
const tasks = getAllTasks();
expect(tasks).toHaveLength(1);
expect(tasks[0].schedule_type).toBe('cron');
expect(tasks[0].next_run).toBeTruthy();
// next_run should be a valid ISO date in the future
expect(new Date(tasks[0].next_run!).getTime()).toBeGreaterThan(Date.now() - 60000);
});
it('rejects invalid cron expression', async () => {
await processTaskIpc(
{
type: 'schedule_task',
prompt: 'bad cron',
schedule_type: 'cron',
schedule_value: 'not a cron',
targetJid: 'other@g.us',
},
'main',
true,
deps,
);
expect(getAllTasks()).toHaveLength(0);
});
it('creates task with interval schedule', async () => {
const before = Date.now();
await processTaskIpc(
{
type: 'schedule_task',
prompt: 'interval task',
schedule_type: 'interval',
schedule_value: '3600000', // 1 hour
targetJid: 'other@g.us',
},
'main',
true,
deps,
);
const tasks = getAllTasks();
expect(tasks).toHaveLength(1);
expect(tasks[0].schedule_type).toBe('interval');
// next_run should be ~1 hour from now
const nextRun = new Date(tasks[0].next_run!).getTime();
expect(nextRun).toBeGreaterThanOrEqual(before + 3600000 - 1000);
expect(nextRun).toBeLessThanOrEqual(Date.now() + 3600000 + 1000);
});
it('rejects invalid interval (non-numeric)', async () => {
await processTaskIpc(
{
type: 'schedule_task',
prompt: 'bad interval',
schedule_type: 'interval',
schedule_value: 'abc',
targetJid: 'other@g.us',
},
'main',
true,
deps,
);
expect(getAllTasks()).toHaveLength(0);
});
it('rejects invalid interval (zero)', async () => {
await processTaskIpc(
{
type: 'schedule_task',
prompt: 'zero interval',
schedule_type: 'interval',
schedule_value: '0',
targetJid: 'other@g.us',
},
'main',
true,
deps,
);
expect(getAllTasks()).toHaveLength(0);
});
it('rejects invalid once timestamp', async () => {
await processTaskIpc(
{
type: 'schedule_task',
prompt: 'bad once',
schedule_type: 'once',
schedule_value: 'not-a-date',
targetJid: 'other@g.us',
},
'main',
true,
deps,
);
expect(getAllTasks()).toHaveLength(0);
});
});
// --- context_mode defaulting ---
describe('schedule_task context_mode', () => {
it('accepts context_mode=group', async () => {
await processTaskIpc(
{
type: 'schedule_task',
prompt: 'group context',
schedule_type: 'once',
schedule_value: '2025-06-01T00:00:00.000Z',
context_mode: 'group',
targetJid: 'other@g.us',
},
'main',
true,
deps,
);
const tasks = getAllTasks();
expect(tasks[0].context_mode).toBe('group');
});
it('accepts context_mode=isolated', async () => {
await processTaskIpc(
{
type: 'schedule_task',
prompt: 'isolated context',
schedule_type: 'once',
schedule_value: '2025-06-01T00:00:00.000Z',
context_mode: 'isolated',
targetJid: 'other@g.us',
},
'main',
true,
deps,
);
const tasks = getAllTasks();
expect(tasks[0].context_mode).toBe('isolated');
});
it('defaults invalid context_mode to isolated', async () => {
await processTaskIpc(
{
type: 'schedule_task',
prompt: 'bad context',
schedule_type: 'once',
schedule_value: '2025-06-01T00:00:00.000Z',
context_mode: 'bogus' as any,
targetJid: 'other@g.us',
},
'main',
true,
deps,
);
const tasks = getAllTasks();
expect(tasks[0].context_mode).toBe('isolated');
});
it('defaults missing context_mode to isolated', async () => {
await processTaskIpc(
{
type: 'schedule_task',
prompt: 'no context mode',
schedule_type: 'once',
schedule_value: '2025-06-01T00:00:00.000Z',
targetJid: 'other@g.us',
},
'main',
true,
deps,
);
const tasks = getAllTasks();
expect(tasks[0].context_mode).toBe('isolated');
});
});
// --- register_group success path ---
describe('register_group success', () => {
it('main group can register a new group', async () => {
await processTaskIpc(
{
type: 'register_group',
jid: 'new@g.us',
name: 'New Group',
folder: 'new-group',
trigger: '@Andy',
},
'main',
true,
deps,
);
// Verify group was registered in DB
const group = getRegisteredGroup('new@g.us');
expect(group).toBeDefined();
expect(group!.name).toBe('New Group');
expect(group!.folder).toBe('new-group');
expect(group!.trigger).toBe('@Andy');
});
it('register_group rejects request with missing fields', async () => {
await processTaskIpc(
{
type: 'register_group',
jid: 'partial@g.us',
name: 'Partial',
// missing folder and trigger
},
'main',
true,
deps,
);
expect(getRegisteredGroup('partial@g.us')).toBeUndefined();
});
});

381
src/ipc.ts Normal file
View File

@@ -0,0 +1,381 @@
import fs from 'fs';
import path from 'path';
import { CronExpressionParser } from 'cron-parser';
import {
ASSISTANT_NAME,
DATA_DIR,
IPC_POLL_INTERVAL,
MAIN_GROUP_FOLDER,
TIMEZONE,
} from './config.js';
import { AvailableGroup } from './container-runner.js';
import { createTask, deleteTask, getTaskById, updateTask } from './db.js';
import { logger } from './logger.js';
import { RegisteredGroup } from './types.js';
export interface IpcDeps {
sendMessage: (jid: string, text: string) => Promise<void>;
registeredGroups: () => Record<string, RegisteredGroup>;
registerGroup: (jid: string, group: RegisteredGroup) => void;
syncGroupMetadata: (force: boolean) => Promise<void>;
getAvailableGroups: () => AvailableGroup[];
writeGroupsSnapshot: (
groupFolder: string,
isMain: boolean,
availableGroups: AvailableGroup[],
registeredJids: Set<string>,
) => void;
}
let ipcWatcherRunning = false;
export function startIpcWatcher(deps: IpcDeps): void {
if (ipcWatcherRunning) {
logger.debug('IPC watcher already running, skipping duplicate start');
return;
}
ipcWatcherRunning = true;
const ipcBaseDir = path.join(DATA_DIR, 'ipc');
fs.mkdirSync(ipcBaseDir, { recursive: true });
const processIpcFiles = async () => {
// Scan all group IPC directories (identity determined by directory)
let groupFolders: string[];
try {
groupFolders = fs.readdirSync(ipcBaseDir).filter((f) => {
const stat = fs.statSync(path.join(ipcBaseDir, f));
return stat.isDirectory() && f !== 'errors';
});
} catch (err) {
logger.error({ err }, 'Error reading IPC base directory');
setTimeout(processIpcFiles, IPC_POLL_INTERVAL);
return;
}
const registeredGroups = deps.registeredGroups();
for (const sourceGroup of groupFolders) {
const isMain = sourceGroup === MAIN_GROUP_FOLDER;
const messagesDir = path.join(ipcBaseDir, sourceGroup, 'messages');
const tasksDir = path.join(ipcBaseDir, sourceGroup, 'tasks');
// Process messages from this group's IPC directory
try {
if (fs.existsSync(messagesDir)) {
const messageFiles = fs
.readdirSync(messagesDir)
.filter((f) => f.endsWith('.json'));
for (const file of messageFiles) {
const filePath = path.join(messagesDir, file);
try {
const data = JSON.parse(fs.readFileSync(filePath, 'utf-8'));
if (data.type === 'message' && data.chatJid && data.text) {
// Authorization: verify this group can send to this chatJid
const targetGroup = registeredGroups[data.chatJid];
if (
isMain ||
(targetGroup && targetGroup.folder === sourceGroup)
) {
await deps.sendMessage(
data.chatJid,
`${ASSISTANT_NAME}: ${data.text}`,
);
logger.info(
{ chatJid: data.chatJid, sourceGroup },
'IPC message sent',
);
} else {
logger.warn(
{ chatJid: data.chatJid, sourceGroup },
'Unauthorized IPC message attempt blocked',
);
}
}
fs.unlinkSync(filePath);
} catch (err) {
logger.error(
{ file, sourceGroup, err },
'Error processing IPC message',
);
const errorDir = path.join(ipcBaseDir, 'errors');
fs.mkdirSync(errorDir, { recursive: true });
fs.renameSync(
filePath,
path.join(errorDir, `${sourceGroup}-${file}`),
);
}
}
}
} catch (err) {
logger.error(
{ err, sourceGroup },
'Error reading IPC messages directory',
);
}
// Process tasks from this group's IPC directory
try {
if (fs.existsSync(tasksDir)) {
const taskFiles = fs
.readdirSync(tasksDir)
.filter((f) => f.endsWith('.json'));
for (const file of taskFiles) {
const filePath = path.join(tasksDir, file);
try {
const data = JSON.parse(fs.readFileSync(filePath, 'utf-8'));
// Pass source group identity to processTaskIpc for authorization
await processTaskIpc(data, sourceGroup, isMain, deps);
fs.unlinkSync(filePath);
} catch (err) {
logger.error(
{ file, sourceGroup, err },
'Error processing IPC task',
);
const errorDir = path.join(ipcBaseDir, 'errors');
fs.mkdirSync(errorDir, { recursive: true });
fs.renameSync(
filePath,
path.join(errorDir, `${sourceGroup}-${file}`),
);
}
}
}
} catch (err) {
logger.error({ err, sourceGroup }, 'Error reading IPC tasks directory');
}
}
setTimeout(processIpcFiles, IPC_POLL_INTERVAL);
};
processIpcFiles();
logger.info('IPC watcher started (per-group namespaces)');
}
export async function processTaskIpc(
data: {
type: string;
taskId?: string;
prompt?: string;
schedule_type?: string;
schedule_value?: string;
context_mode?: string;
groupFolder?: string;
chatJid?: string;
targetJid?: string;
// For register_group
jid?: string;
name?: string;
folder?: string;
trigger?: string;
containerConfig?: RegisteredGroup['containerConfig'];
},
sourceGroup: string, // Verified identity from IPC directory
isMain: boolean, // Verified from directory path
deps: IpcDeps,
): Promise<void> {
const registeredGroups = deps.registeredGroups();
switch (data.type) {
case 'schedule_task':
if (
data.prompt &&
data.schedule_type &&
data.schedule_value &&
data.targetJid
) {
// Resolve the target group from JID
const targetJid = data.targetJid as string;
const targetGroupEntry = registeredGroups[targetJid];
if (!targetGroupEntry) {
logger.warn(
{ targetJid },
'Cannot schedule task: target group not registered',
);
break;
}
const targetFolder = targetGroupEntry.folder;
// Authorization: non-main groups can only schedule for themselves
if (!isMain && targetFolder !== sourceGroup) {
logger.warn(
{ sourceGroup, targetFolder },
'Unauthorized schedule_task attempt blocked',
);
break;
}
const scheduleType = data.schedule_type as 'cron' | 'interval' | 'once';
let nextRun: string | null = null;
if (scheduleType === 'cron') {
try {
const interval = CronExpressionParser.parse(data.schedule_value, {
tz: TIMEZONE,
});
nextRun = interval.next().toISOString();
} catch {
logger.warn(
{ scheduleValue: data.schedule_value },
'Invalid cron expression',
);
break;
}
} else if (scheduleType === 'interval') {
const ms = parseInt(data.schedule_value, 10);
if (isNaN(ms) || ms <= 0) {
logger.warn(
{ scheduleValue: data.schedule_value },
'Invalid interval',
);
break;
}
nextRun = new Date(Date.now() + ms).toISOString();
} else if (scheduleType === 'once') {
const scheduled = new Date(data.schedule_value);
if (isNaN(scheduled.getTime())) {
logger.warn(
{ scheduleValue: data.schedule_value },
'Invalid timestamp',
);
break;
}
nextRun = scheduled.toISOString();
}
const taskId = `task-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
const contextMode =
data.context_mode === 'group' || data.context_mode === 'isolated'
? data.context_mode
: 'isolated';
createTask({
id: taskId,
group_folder: targetFolder,
chat_jid: targetJid,
prompt: data.prompt,
schedule_type: scheduleType,
schedule_value: data.schedule_value,
context_mode: contextMode,
next_run: nextRun,
status: 'active',
created_at: new Date().toISOString(),
});
logger.info(
{ taskId, sourceGroup, targetFolder, contextMode },
'Task created via IPC',
);
}
break;
case 'pause_task':
if (data.taskId) {
const task = getTaskById(data.taskId);
if (task && (isMain || task.group_folder === sourceGroup)) {
updateTask(data.taskId, { status: 'paused' });
logger.info(
{ taskId: data.taskId, sourceGroup },
'Task paused via IPC',
);
} else {
logger.warn(
{ taskId: data.taskId, sourceGroup },
'Unauthorized task pause attempt',
);
}
}
break;
case 'resume_task':
if (data.taskId) {
const task = getTaskById(data.taskId);
if (task && (isMain || task.group_folder === sourceGroup)) {
updateTask(data.taskId, { status: 'active' });
logger.info(
{ taskId: data.taskId, sourceGroup },
'Task resumed via IPC',
);
} else {
logger.warn(
{ taskId: data.taskId, sourceGroup },
'Unauthorized task resume attempt',
);
}
}
break;
case 'cancel_task':
if (data.taskId) {
const task = getTaskById(data.taskId);
if (task && (isMain || task.group_folder === sourceGroup)) {
deleteTask(data.taskId);
logger.info(
{ taskId: data.taskId, sourceGroup },
'Task cancelled via IPC',
);
} else {
logger.warn(
{ taskId: data.taskId, sourceGroup },
'Unauthorized task cancel attempt',
);
}
}
break;
case 'refresh_groups':
// Only main group can request a refresh
if (isMain) {
logger.info(
{ sourceGroup },
'Group metadata refresh requested via IPC',
);
await deps.syncGroupMetadata(true);
// Write updated snapshot immediately
const availableGroups = deps.getAvailableGroups();
deps.writeGroupsSnapshot(
sourceGroup,
true,
availableGroups,
new Set(Object.keys(registeredGroups)),
);
} else {
logger.warn(
{ sourceGroup },
'Unauthorized refresh_groups attempt blocked',
);
}
break;
case 'register_group':
// Only main group can register new groups
if (!isMain) {
logger.warn(
{ sourceGroup },
'Unauthorized register_group attempt blocked',
);
break;
}
if (data.jid && data.name && data.folder && data.trigger) {
deps.registerGroup(data.jid, {
name: data.name,
folder: data.folder,
trigger: data.trigger,
added_at: new Date().toISOString(),
containerConfig: data.containerConfig,
});
} else {
logger.warn(
{ data },
'Invalid register_group request - missing required fields',
);
}
break;
default:
logger.warn({ type: data.type }, 'Unknown IPC task type');
}
}

46
src/router.ts Normal file
View File

@@ -0,0 +1,46 @@
import { ASSISTANT_NAME } from './config.js';
import { Channel, NewMessage } from './types.js';
export function escapeXml(s: string): string {
return s
.replace(/&/g, '&amp;')
.replace(/</g, '&lt;')
.replace(/>/g, '&gt;')
.replace(/"/g, '&quot;');
}
export function formatMessages(messages: NewMessage[]): string {
const lines = messages.map((m) =>
`<message sender="${escapeXml(m.sender_name)}" time="${m.timestamp}">${escapeXml(m.content)}</message>`,
);
return `<messages>\n${lines.join('\n')}\n</messages>`;
}
export function stripInternalTags(text: string): string {
return text.replace(/<internal>[\s\S]*?<\/internal>/g, '').trim();
}
export function formatOutbound(channel: Channel, rawText: string): string {
const text = stripInternalTags(rawText);
if (!text) return '';
const prefix =
channel.prefixAssistantName !== false ? `${ASSISTANT_NAME}: ` : '';
return `${prefix}${text}`;
}
export function routeOutbound(
channels: Channel[],
jid: string,
text: string,
): Promise<void> {
const channel = channels.find((c) => c.ownsJid(jid) && c.isConnected());
if (!channel) throw new Error(`No channel for JID: ${jid}`);
return channel.sendMessage(jid, text);
}
export function findChannel(
channels: Channel[],
jid: string,
): Channel | undefined {
return channels.find((c) => c.ownsJid(jid));
}

91
src/routing.test.ts Normal file
View File

@@ -0,0 +1,91 @@
import { describe, it, expect, beforeEach } from 'vitest';
import { _initTestDatabase, getAllChats, storeChatMetadata } from './db.js';
import { getAvailableGroups, _setRegisteredGroups } from './index.js';
beforeEach(() => {
_initTestDatabase();
_setRegisteredGroups({});
});
// --- JID ownership patterns ---
describe('JID ownership patterns', () => {
// These test the patterns that will become ownsJid() on the Channel interface
it('WhatsApp group JID: ends with @g.us', () => {
const jid = '12345678@g.us';
expect(jid.endsWith('@g.us')).toBe(true);
});
it('WhatsApp DM JID: ends with @s.whatsapp.net', () => {
const jid = '12345678@s.whatsapp.net';
expect(jid.endsWith('@s.whatsapp.net')).toBe(true);
});
it('unknown JID format: does not match WhatsApp patterns', () => {
const jid = 'unknown:12345';
expect(jid.endsWith('@g.us')).toBe(false);
expect(jid.endsWith('@s.whatsapp.net')).toBe(false);
});
});
// --- getAvailableGroups ---
describe('getAvailableGroups', () => {
it('returns only @g.us JIDs', () => {
storeChatMetadata('group1@g.us', '2024-01-01T00:00:01.000Z', 'Group 1');
storeChatMetadata('user@s.whatsapp.net', '2024-01-01T00:00:02.000Z', 'User DM');
storeChatMetadata('group2@g.us', '2024-01-01T00:00:03.000Z', 'Group 2');
const groups = getAvailableGroups();
expect(groups).toHaveLength(2);
expect(groups.every((g) => g.jid.endsWith('@g.us'))).toBe(true);
});
it('excludes __group_sync__ sentinel', () => {
storeChatMetadata('__group_sync__', '2024-01-01T00:00:00.000Z');
storeChatMetadata('group@g.us', '2024-01-01T00:00:01.000Z', 'Group');
const groups = getAvailableGroups();
expect(groups).toHaveLength(1);
expect(groups[0].jid).toBe('group@g.us');
});
it('marks registered groups correctly', () => {
storeChatMetadata('reg@g.us', '2024-01-01T00:00:01.000Z', 'Registered');
storeChatMetadata('unreg@g.us', '2024-01-01T00:00:02.000Z', 'Unregistered');
_setRegisteredGroups({
'reg@g.us': {
name: 'Registered',
folder: 'registered',
trigger: '@Andy',
added_at: '2024-01-01T00:00:00.000Z',
},
});
const groups = getAvailableGroups();
const reg = groups.find((g) => g.jid === 'reg@g.us');
const unreg = groups.find((g) => g.jid === 'unreg@g.us');
expect(reg?.isRegistered).toBe(true);
expect(unreg?.isRegistered).toBe(false);
});
it('returns groups ordered by most recent activity', () => {
storeChatMetadata('old@g.us', '2024-01-01T00:00:01.000Z', 'Old');
storeChatMetadata('new@g.us', '2024-01-01T00:00:05.000Z', 'New');
storeChatMetadata('mid@g.us', '2024-01-01T00:00:03.000Z', 'Mid');
const groups = getAvailableGroups();
expect(groups[0].jid).toBe('new@g.us');
expect(groups[1].jid).toBe('mid@g.us');
expect(groups[2].jid).toBe('old@g.us');
});
it('returns empty array when no chats exist', () => {
const groups = getAvailableGroups();
expect(groups).toHaveLength(0);
});
});

View File

@@ -28,7 +28,6 @@ export interface SchedulerDependencies {
queue: GroupQueue;
onProcess: (groupJid: string, proc: ChildProcess, containerName: string, groupFolder: string) => void;
sendMessage: (jid: string, text: string) => Promise<void>;
assistantName: string;
}
async function runTask(
@@ -117,11 +116,8 @@ async function runTask(
async (streamedOutput: ContainerOutput) => {
if (streamedOutput.result) {
result = streamedOutput.result;
// Forward result to user (strip <internal> tags)
const text = streamedOutput.result.replace(/<internal>[\s\S]*?<\/internal>/g, '').trim();
if (text) {
await deps.sendMessage(task.chat_jid, `${deps.assistantName}: ${text}`);
}
// Forward result to user (sendMessage handles formatting)
await deps.sendMessage(task.chat_jid, streamedOutput.result);
// Only reset idle timer on actual results, not session-update markers
resetIdleTimer();
}

327
src/telegram.ts Normal file
View File

@@ -0,0 +1,327 @@
import { Api, Bot } from 'grammy';
import {
ASSISTANT_NAME,
TRIGGER_PATTERN,
} from './config.js';
import {
getAllRegisteredGroups,
storeChatMetadata,
storeMessageDirect,
} from './db.js';
import { logger } from './logger.js';
let bot: Bot | null = null;
// Bot pool for agent teams: send-only Api instances (no polling)
const poolApis: Api[] = [];
// Current display name for each pool bot (from getMe at startup)
const poolBotNames: string[] = [];
// Maps "{groupFolder}:{senderName}" → pool Api index for stable assignment
const senderBotMap = new Map<string, number>();
// Tracks which pool indices are already claimed this session
const assignedIndices = new Set<number>();
/** Store a placeholder message for non-text content (photos, voice, etc.) */
function storeNonTextMessage(ctx: any, placeholder: string): void {
const chatId = `tg:${ctx.chat.id}`;
const registeredGroups = getAllRegisteredGroups();
if (!registeredGroups[chatId]) return;
const timestamp = new Date(ctx.message.date * 1000).toISOString();
const senderName =
ctx.from?.first_name || ctx.from?.username || ctx.from?.id?.toString() || 'Unknown';
const caption = ctx.message.caption ? ` ${ctx.message.caption}` : '';
storeChatMetadata(chatId, timestamp);
storeMessageDirect({
id: ctx.message.message_id.toString(),
chat_jid: chatId,
sender: ctx.from?.id?.toString() || '',
sender_name: senderName,
content: `${placeholder}${caption}`,
timestamp,
is_from_me: false,
});
}
export async function connectTelegram(botToken: string): Promise<void> {
bot = new Bot(botToken);
// Command to get chat ID (useful for registration)
bot.command('chatid', (ctx) => {
const chatId = ctx.chat.id;
const chatType = ctx.chat.type;
const chatName =
chatType === 'private'
? ctx.from?.first_name || 'Private'
: (ctx.chat as any).title || 'Unknown';
ctx.reply(
`Chat ID: \`tg:${chatId}\`\nName: ${chatName}\nType: ${chatType}`,
{ parse_mode: 'Markdown' },
);
});
// Command to check bot status
bot.command('ping', (ctx) => {
ctx.reply(`${ASSISTANT_NAME} is online.`);
});
bot.on('message:text', async (ctx) => {
// Skip commands
if (ctx.message.text.startsWith('/')) return;
const chatId = `tg:${ctx.chat.id}`;
let content = ctx.message.text;
const timestamp = new Date(ctx.message.date * 1000).toISOString();
const senderName =
ctx.from?.first_name ||
ctx.from?.username ||
ctx.from?.id.toString() ||
'Unknown';
const sender = ctx.from?.id.toString() || '';
const msgId = ctx.message.message_id.toString();
// Determine chat name
const chatName =
ctx.chat.type === 'private'
? senderName
: (ctx.chat as any).title || chatId;
// Translate Telegram @bot_username mentions into TRIGGER_PATTERN format.
// Telegram @mentions (e.g., @andy_ai_bot) won't match TRIGGER_PATTERN
// (e.g., ^@Andy\b), so we prepend the trigger when the bot is @mentioned.
const botUsername = ctx.me?.username?.toLowerCase();
if (botUsername) {
const entities = ctx.message.entities || [];
const isBotMentioned = entities.some((entity) => {
if (entity.type === 'mention') {
const mentionText = content
.substring(entity.offset, entity.offset + entity.length)
.toLowerCase();
return mentionText === `@${botUsername}`;
}
return false;
});
if (isBotMentioned && !TRIGGER_PATTERN.test(content)) {
content = `@${ASSISTANT_NAME} ${content}`;
}
}
// Store chat metadata for discovery
storeChatMetadata(chatId, timestamp, chatName);
// Check if this chat is registered
const registeredGroups = getAllRegisteredGroups();
const group = registeredGroups[chatId];
if (!group) {
logger.debug(
{ chatId, chatName },
'Message from unregistered Telegram chat',
);
return;
}
// Store message — startMessageLoop() will pick it up
storeMessageDirect({
id: msgId,
chat_jid: chatId,
sender,
sender_name: senderName,
content,
timestamp,
is_from_me: false,
});
logger.info(
{ chatId, chatName, sender: senderName },
'Telegram message stored',
);
});
// Handle non-text messages with placeholders so the agent knows something was sent
bot.on('message:photo', (ctx) => storeNonTextMessage(ctx, '[Photo]'));
bot.on('message:video', (ctx) => storeNonTextMessage(ctx, '[Video]'));
bot.on('message:voice', (ctx) => storeNonTextMessage(ctx, '[Voice message]'));
bot.on('message:audio', (ctx) => storeNonTextMessage(ctx, '[Audio]'));
bot.on('message:document', (ctx) => {
const name = ctx.message.document?.file_name || 'file';
storeNonTextMessage(ctx, `[Document: ${name}]`);
});
bot.on('message:sticker', (ctx) => {
const emoji = ctx.message.sticker?.emoji || '';
storeNonTextMessage(ctx, `[Sticker ${emoji}]`);
});
bot.on('message:location', (ctx) => storeNonTextMessage(ctx, '[Location]'));
bot.on('message:contact', (ctx) => storeNonTextMessage(ctx, '[Contact]'));
// Handle errors gracefully
bot.catch((err) => {
logger.error({ err: err.message }, 'Telegram bot error');
});
// Start polling
bot.start({
onStart: (botInfo) => {
logger.info(
{ username: botInfo.username, id: botInfo.id },
'Telegram bot connected',
);
console.log(`\n Telegram bot: @${botInfo.username}`);
console.log(
` Send /chatid to the bot to get a chat's registration ID\n`,
);
},
});
}
export async function sendTelegramMessage(
chatId: string,
text: string,
): Promise<void> {
if (!bot) {
logger.warn('Telegram bot not initialized');
return;
}
try {
const numericId = chatId.replace(/^tg:/, '');
// Telegram has a 4096 character limit per message — split if needed
const MAX_LENGTH = 4096;
if (text.length <= MAX_LENGTH) {
await bot.api.sendMessage(numericId, text);
} else {
for (let i = 0; i < text.length; i += MAX_LENGTH) {
await bot.api.sendMessage(numericId, text.slice(i, i + MAX_LENGTH));
}
}
logger.info({ chatId, length: text.length }, 'Telegram message sent');
} catch (err) {
logger.error({ chatId, err }, 'Failed to send Telegram message');
}
}
export async function setTelegramTyping(chatId: string): Promise<void> {
if (!bot) return;
try {
const numericId = chatId.replace(/^tg:/, '');
await bot.api.sendChatAction(numericId, 'typing');
} catch (err) {
logger.debug({ chatId, err }, 'Failed to send Telegram typing indicator');
}
}
/**
* Initialize send-only Api instances for the bot pool.
* Each pool bot can send messages but doesn't poll for updates.
*/
export async function initBotPool(tokens: string[]): Promise<void> {
for (const token of tokens) {
try {
const api = new Api(token);
const me = await api.getMe();
poolApis.push(api);
poolBotNames.push(me.first_name);
logger.info(
{ username: me.username, name: me.first_name, id: me.id, poolSize: poolApis.length },
'Pool bot initialized',
);
} catch (err) {
logger.error({ err }, 'Failed to initialize pool bot');
}
}
if (poolApis.length > 0) {
logger.info({ count: poolApis.length, names: poolBotNames }, 'Telegram bot pool ready');
}
}
/**
* Send a message via a pool bot assigned to the given sender name.
* Assignment priority:
* 1. Already assigned to this sender this session → reuse
* 2. A pool bot whose current name matches the sender → claim it (no rename needed)
* 3. First unassigned pool bot → claim and rename
* 4. All claimed → wrap around (reuse + rename)
*/
export async function sendPoolMessage(
chatId: string,
text: string,
sender: string,
groupFolder: string,
): Promise<void> {
if (poolApis.length === 0) {
// No pool bots — fall back to main bot
await sendTelegramMessage(chatId, text);
return;
}
const key = `${groupFolder}:${sender}`;
let idx = senderBotMap.get(key);
if (idx === undefined) {
// 1. Check if any pool bot already has this name (from a previous session)
const nameMatch = poolBotNames.findIndex(
(name, i) => name === sender && !assignedIndices.has(i),
);
if (nameMatch !== -1) {
idx = nameMatch;
assignedIndices.add(idx);
senderBotMap.set(key, idx);
logger.info({ sender, groupFolder, poolIndex: idx }, 'Matched pool bot by name');
} else {
// 2. Pick first unassigned bot
let freeIdx = -1;
for (let i = 0; i < poolApis.length; i++) {
if (!assignedIndices.has(i)) {
freeIdx = i;
break;
}
}
// 3. All assigned — wrap around to least recently used
if (freeIdx === -1) freeIdx = assignedIndices.size % poolApis.length;
idx = freeIdx;
assignedIndices.add(idx);
senderBotMap.set(key, idx);
// Rename the bot, then wait for Telegram to propagate
try {
await poolApis[idx].setMyName(sender);
poolBotNames[idx] = sender;
await new Promise((r) => setTimeout(r, 2000));
logger.info({ sender, groupFolder, poolIndex: idx }, 'Assigned and renamed pool bot');
} catch (err) {
logger.warn({ sender, err }, 'Failed to rename pool bot (sending anyway)');
}
}
}
const api = poolApis[idx];
try {
const numericId = chatId.replace(/^tg:/, '');
const MAX_LENGTH = 4096;
if (text.length <= MAX_LENGTH) {
await api.sendMessage(numericId, text);
} else {
for (let i = 0; i < text.length; i += MAX_LENGTH) {
await api.sendMessage(numericId, text.slice(i, i + MAX_LENGTH));
}
}
logger.info({ chatId, sender, poolIndex: idx, length: text.length }, 'Pool message sent');
} catch (err) {
logger.error({ chatId, sender, err }, 'Failed to send pool message');
}
}
export function isTelegramConnected(): boolean {
return bot !== null;
}
export function stopTelegram(): void {
if (bot) {
bot.stop();
bot = null;
logger.info('Telegram bot stopped');
}
}

View File

@@ -48,6 +48,7 @@ export interface NewMessage {
sender_name: string;
content: string;
timestamp: string;
is_from_me?: boolean;
}
export interface ScheduledTask {
@@ -73,3 +74,28 @@ export interface TaskRunLog {
result: string | null;
error: string | null;
}
// --- Channel abstraction ---
export interface Channel {
name: string;
connect(): Promise<void>;
sendMessage(jid: string, text: string): Promise<void>;
isConnected(): boolean;
ownsJid(jid: string): boolean;
disconnect(): Promise<void>;
// Optional: typing indicator. Channels that support it implement it.
setTyping?(jid: string, isTyping: boolean): Promise<void>;
// Whether to prefix outbound messages with the assistant name.
// Telegram bots already display their name, so they return false.
// WhatsApp returns true. Default true if not implemented.
prefixAssistantName?: boolean;
}
// Callback type that channels use to deliver inbound messages
export type OnInboundMessage = (chatJid: string, message: NewMessage) => void;
// Callback for chat metadata discovery.
// name is optional — channels that deliver names inline (Telegram) pass it here;
// channels that sync names separately (WhatsApp syncGroupMetadata) omit it.
export type OnChatMetadata = (chatJid: string, timestamp: string, name?: string) => void;