Include missed messages when catching up the agent
When a triggered message comes in, fetch all messages in that chat since the last agent interaction and include them in the prompt. Each message is formatted with timestamp and sender. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
10
src/db.ts
10
src/db.ts
@@ -72,3 +72,13 @@ export function getNewMessages(jids: string[], lastTimestamp: string): { message
|
|||||||
|
|
||||||
return { messages: rows, newTimestamp };
|
return { messages: rows, newTimestamp };
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export function getMessagesSince(chatJid: string, sinceTimestamp: string): NewMessage[] {
|
||||||
|
const sql = `
|
||||||
|
SELECT id, chat_jid, sender, content, timestamp
|
||||||
|
FROM messages
|
||||||
|
WHERE chat_jid = ? AND timestamp > ?
|
||||||
|
ORDER BY timestamp
|
||||||
|
`;
|
||||||
|
return db.prepare(sql).all(chatJid, sinceTimestamp) as NewMessage[];
|
||||||
|
}
|
||||||
|
|||||||
34
src/index.ts
34
src/index.ts
@@ -20,7 +20,7 @@ import {
|
|||||||
CLEAR_COMMAND
|
CLEAR_COMMAND
|
||||||
} from './config.js';
|
} from './config.js';
|
||||||
import { RegisteredGroup, Session, NewMessage } from './types.js';
|
import { RegisteredGroup, Session, NewMessage } from './types.js';
|
||||||
import { initDatabase, storeMessage, getNewMessages } from './db.js';
|
import { initDatabase, storeMessage, getNewMessages, getMessagesSince } from './db.js';
|
||||||
|
|
||||||
const logger = pino({
|
const logger = pino({
|
||||||
level: process.env.LOG_LEVEL || 'info',
|
level: process.env.LOG_LEVEL || 'info',
|
||||||
@@ -31,6 +31,7 @@ let sock: WASocket;
|
|||||||
let lastTimestamp = '';
|
let lastTimestamp = '';
|
||||||
let sessions: Session = {};
|
let sessions: Session = {};
|
||||||
let registeredGroups: Record<string, RegisteredGroup> = {};
|
let registeredGroups: Record<string, RegisteredGroup> = {};
|
||||||
|
let lastAgentTimestamp: Record<string, string> = {};
|
||||||
|
|
||||||
function loadJson<T>(filePath: string, defaultValue: T): T {
|
function loadJson<T>(filePath: string, defaultValue: T): T {
|
||||||
try {
|
try {
|
||||||
@@ -50,15 +51,16 @@ function saveJson(filePath: string, data: unknown): void {
|
|||||||
|
|
||||||
function loadState(): void {
|
function loadState(): void {
|
||||||
const statePath = path.join(DATA_DIR, 'router_state.json');
|
const statePath = path.join(DATA_DIR, 'router_state.json');
|
||||||
const state = loadJson<{ last_timestamp?: string }>(statePath, {});
|
const state = loadJson<{ last_timestamp?: string; last_agent_timestamp?: Record<string, string> }>(statePath, {});
|
||||||
lastTimestamp = state.last_timestamp || '';
|
lastTimestamp = state.last_timestamp || '';
|
||||||
|
lastAgentTimestamp = state.last_agent_timestamp || {};
|
||||||
sessions = loadJson(path.join(DATA_DIR, 'sessions.json'), {});
|
sessions = loadJson(path.join(DATA_DIR, 'sessions.json'), {});
|
||||||
registeredGroups = loadJson(path.join(DATA_DIR, 'registered_groups.json'), {});
|
registeredGroups = loadJson(path.join(DATA_DIR, 'registered_groups.json'), {});
|
||||||
logger.info({ groupCount: Object.keys(registeredGroups).length }, 'State loaded');
|
logger.info({ groupCount: Object.keys(registeredGroups).length }, 'State loaded');
|
||||||
}
|
}
|
||||||
|
|
||||||
function saveState(): void {
|
function saveState(): void {
|
||||||
saveJson(path.join(DATA_DIR, 'router_state.json'), { last_timestamp: lastTimestamp });
|
saveJson(path.join(DATA_DIR, 'router_state.json'), { last_timestamp: lastTimestamp, last_agent_timestamp: lastAgentTimestamp });
|
||||||
saveJson(path.join(DATA_DIR, 'sessions.json'), sessions);
|
saveJson(path.join(DATA_DIR, 'sessions.json'), sessions);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -86,11 +88,31 @@ async function processMessage(msg: NewMessage): Promise<void> {
|
|||||||
|
|
||||||
if (!TRIGGER_PATTERN.test(content)) return;
|
if (!TRIGGER_PATTERN.test(content)) return;
|
||||||
|
|
||||||
const prompt = content.replace(TRIGGER_PATTERN, '').trim();
|
const userMessage = content.replace(TRIGGER_PATTERN, '').trim();
|
||||||
if (!prompt) return;
|
if (!userMessage) return;
|
||||||
|
|
||||||
logger.info({ group: group.name, prompt: prompt.slice(0, 50) }, 'Processing message');
|
// Get messages since last agent interaction to catch up the session
|
||||||
|
const sinceTimestamp = lastAgentTimestamp[msg.chat_jid] || '';
|
||||||
|
const missedMessages = getMessagesSince(msg.chat_jid, sinceTimestamp);
|
||||||
|
|
||||||
|
// Build prompt with conversation history
|
||||||
|
let prompt = '';
|
||||||
|
for (const m of missedMessages) {
|
||||||
|
if (m.id === msg.id) continue; // Skip current message, we'll add it at the end
|
||||||
|
const time = new Date(m.timestamp).toLocaleTimeString();
|
||||||
|
const sender = m.sender.split('@')[0]; // Extract phone number or name
|
||||||
|
prompt += `[${time}] ${sender}: ${m.content}\n`;
|
||||||
|
}
|
||||||
|
const time = new Date(msg.timestamp).toLocaleTimeString();
|
||||||
|
const sender = msg.sender.split('@')[0];
|
||||||
|
prompt += `[${time}] ${sender}: ${userMessage}`;
|
||||||
|
|
||||||
|
logger.info({ group: group.name, messageCount: missedMessages.length }, 'Processing message');
|
||||||
const response = await runAgent(group, prompt);
|
const response = await runAgent(group, prompt);
|
||||||
|
|
||||||
|
// Update last agent timestamp
|
||||||
|
lastAgentTimestamp[msg.chat_jid] = msg.timestamp;
|
||||||
|
|
||||||
if (response) {
|
if (response) {
|
||||||
await sendMessage(msg.chat_jid, `${ASSISTANT_NAME}: ${response}`);
|
await sendMessage(msg.chat_jid, `${ASSISTANT_NAME}: ${response}`);
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user