Extract database operations into separate db.ts module
- src/db.ts: initDatabase, closeDatabase, storeMessage, getNewMessages - Removes SQL from index.ts - Database initialization happens once at startup Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
78
src/db.ts
Normal file
78
src/db.ts
Normal file
@@ -0,0 +1,78 @@
|
|||||||
|
import Database from 'better-sqlite3';
|
||||||
|
import fs from 'fs';
|
||||||
|
import path from 'path';
|
||||||
|
import { proto } from '@whiskeysockets/baileys';
|
||||||
|
import { NewMessage } from './types.js';
|
||||||
|
import { STORE_DIR } from './config.js';
|
||||||
|
|
||||||
|
let db: Database.Database;
|
||||||
|
|
||||||
|
export function initDatabase(): void {
|
||||||
|
const dbPath = path.join(STORE_DIR, 'messages.db');
|
||||||
|
fs.mkdirSync(path.dirname(dbPath), { recursive: true });
|
||||||
|
|
||||||
|
db = new Database(dbPath);
|
||||||
|
db.exec(`
|
||||||
|
CREATE TABLE IF NOT EXISTS chats (
|
||||||
|
jid TEXT PRIMARY KEY,
|
||||||
|
name TEXT,
|
||||||
|
last_message_time TEXT
|
||||||
|
);
|
||||||
|
CREATE TABLE IF NOT EXISTS messages (
|
||||||
|
id TEXT,
|
||||||
|
chat_jid TEXT,
|
||||||
|
sender TEXT,
|
||||||
|
content TEXT,
|
||||||
|
timestamp TEXT,
|
||||||
|
is_from_me INTEGER,
|
||||||
|
PRIMARY KEY (id, chat_jid),
|
||||||
|
FOREIGN KEY (chat_jid) REFERENCES chats(jid)
|
||||||
|
);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_timestamp ON messages(timestamp);
|
||||||
|
`);
|
||||||
|
}
|
||||||
|
|
||||||
|
export function closeDatabase(): void {
|
||||||
|
db.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
export function storeMessage(msg: proto.IWebMessageInfo, chatJid: string, isFromMe: boolean): void {
|
||||||
|
if (!msg.key) return;
|
||||||
|
|
||||||
|
const content =
|
||||||
|
msg.message?.conversation ||
|
||||||
|
msg.message?.extendedTextMessage?.text ||
|
||||||
|
msg.message?.imageMessage?.caption ||
|
||||||
|
msg.message?.videoMessage?.caption ||
|
||||||
|
'';
|
||||||
|
|
||||||
|
const timestamp = new Date(Number(msg.messageTimestamp) * 1000).toISOString();
|
||||||
|
const sender = msg.key.participant || msg.key.remoteJid || '';
|
||||||
|
const msgId = msg.key.id || '';
|
||||||
|
|
||||||
|
db.prepare(`INSERT OR REPLACE INTO chats (jid, name, last_message_time) VALUES (?, ?, ?)`)
|
||||||
|
.run(chatJid, chatJid, timestamp);
|
||||||
|
db.prepare(`INSERT OR REPLACE INTO messages (id, chat_jid, sender, content, timestamp, is_from_me) VALUES (?, ?, ?, ?, ?, ?)`)
|
||||||
|
.run(msgId, chatJid, sender, content, timestamp, isFromMe ? 1 : 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
export function getNewMessages(jids: string[], lastTimestamp: string): { messages: NewMessage[]; newTimestamp: string } {
|
||||||
|
if (jids.length === 0) return { messages: [], newTimestamp: lastTimestamp };
|
||||||
|
|
||||||
|
const placeholders = jids.map(() => '?').join(',');
|
||||||
|
const sql = `
|
||||||
|
SELECT id, chat_jid, sender, content, timestamp
|
||||||
|
FROM messages
|
||||||
|
WHERE timestamp > ? AND chat_jid IN (${placeholders})
|
||||||
|
ORDER BY timestamp
|
||||||
|
`;
|
||||||
|
|
||||||
|
const rows = db.prepare(sql).all(lastTimestamp, ...jids) as NewMessage[];
|
||||||
|
|
||||||
|
let newTimestamp = lastTimestamp;
|
||||||
|
for (const row of rows) {
|
||||||
|
if (row.timestamp > newTimestamp) newTimestamp = row.timestamp;
|
||||||
|
}
|
||||||
|
|
||||||
|
return { messages: rows, newTimestamp };
|
||||||
|
}
|
||||||
81
src/index.ts
81
src/index.ts
@@ -2,12 +2,10 @@ import makeWASocket, {
|
|||||||
useMultiFileAuthState,
|
useMultiFileAuthState,
|
||||||
DisconnectReason,
|
DisconnectReason,
|
||||||
makeCacheableSignalKeyStore,
|
makeCacheableSignalKeyStore,
|
||||||
WASocket,
|
WASocket
|
||||||
proto
|
|
||||||
} from '@whiskeysockets/baileys';
|
} from '@whiskeysockets/baileys';
|
||||||
import { query } from '@anthropic-ai/claude-agent-sdk';
|
import { query } from '@anthropic-ai/claude-agent-sdk';
|
||||||
import pino from 'pino';
|
import pino from 'pino';
|
||||||
import Database from 'better-sqlite3';
|
|
||||||
import { exec } from 'child_process';
|
import { exec } from 'child_process';
|
||||||
import fs from 'fs';
|
import fs from 'fs';
|
||||||
import path from 'path';
|
import path from 'path';
|
||||||
@@ -22,42 +20,18 @@ import {
|
|||||||
CLEAR_COMMAND
|
CLEAR_COMMAND
|
||||||
} from './config.js';
|
} from './config.js';
|
||||||
import { RegisteredGroup, Session, NewMessage } from './types.js';
|
import { RegisteredGroup, Session, NewMessage } from './types.js';
|
||||||
|
import { initDatabase, closeDatabase, storeMessage, getNewMessages } from './db.js';
|
||||||
|
|
||||||
const logger = pino({
|
const logger = pino({
|
||||||
level: process.env.LOG_LEVEL || 'info',
|
level: process.env.LOG_LEVEL || 'info',
|
||||||
transport: { target: 'pino-pretty', options: { colorize: true } }
|
transport: { target: 'pino-pretty', options: { colorize: true } }
|
||||||
});
|
});
|
||||||
|
|
||||||
let db: Database.Database;
|
|
||||||
let sock: WASocket;
|
let sock: WASocket;
|
||||||
let lastTimestamp = '';
|
let lastTimestamp = '';
|
||||||
let sessions: Session = {};
|
let sessions: Session = {};
|
||||||
let registeredGroups: Record<string, RegisteredGroup> = {};
|
let registeredGroups: Record<string, RegisteredGroup> = {};
|
||||||
|
|
||||||
function initDatabase(dbPath: string): Database.Database {
|
|
||||||
fs.mkdirSync(path.dirname(dbPath), { recursive: true });
|
|
||||||
const database = new Database(dbPath);
|
|
||||||
database.exec(`
|
|
||||||
CREATE TABLE IF NOT EXISTS chats (
|
|
||||||
jid TEXT PRIMARY KEY,
|
|
||||||
name TEXT,
|
|
||||||
last_message_time TEXT
|
|
||||||
);
|
|
||||||
CREATE TABLE IF NOT EXISTS messages (
|
|
||||||
id TEXT,
|
|
||||||
chat_jid TEXT,
|
|
||||||
sender TEXT,
|
|
||||||
content TEXT,
|
|
||||||
timestamp TEXT,
|
|
||||||
is_from_me INTEGER,
|
|
||||||
PRIMARY KEY (id, chat_jid),
|
|
||||||
FOREIGN KEY (chat_jid) REFERENCES chats(jid)
|
|
||||||
);
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_timestamp ON messages(timestamp);
|
|
||||||
`);
|
|
||||||
return database;
|
|
||||||
}
|
|
||||||
|
|
||||||
function loadJson<T>(filePath: string, defaultValue: T): T {
|
function loadJson<T>(filePath: string, defaultValue: T): T {
|
||||||
try {
|
try {
|
||||||
if (fs.existsSync(filePath)) {
|
if (fs.existsSync(filePath)) {
|
||||||
@@ -88,48 +62,6 @@ function saveState(): void {
|
|||||||
saveJson(path.join(DATA_DIR, 'sessions.json'), sessions);
|
saveJson(path.join(DATA_DIR, 'sessions.json'), sessions);
|
||||||
}
|
}
|
||||||
|
|
||||||
function storeMessage(msg: proto.IWebMessageInfo, chatJid: string, isFromMe: boolean): void {
|
|
||||||
if (!msg.key) return;
|
|
||||||
|
|
||||||
const content =
|
|
||||||
msg.message?.conversation ||
|
|
||||||
msg.message?.extendedTextMessage?.text ||
|
|
||||||
msg.message?.imageMessage?.caption ||
|
|
||||||
msg.message?.videoMessage?.caption ||
|
|
||||||
'';
|
|
||||||
|
|
||||||
const timestamp = new Date(Number(msg.messageTimestamp) * 1000).toISOString();
|
|
||||||
const sender = msg.key.participant || msg.key.remoteJid || '';
|
|
||||||
const msgId = msg.key.id || '';
|
|
||||||
|
|
||||||
try {
|
|
||||||
db.prepare(`INSERT OR REPLACE INTO chats (jid, name, last_message_time) VALUES (?, ?, ?)`).run(chatJid, chatJid, timestamp);
|
|
||||||
db.prepare(`INSERT OR REPLACE INTO messages (id, chat_jid, sender, content, timestamp, is_from_me) VALUES (?, ?, ?, ?, ?, ?)`).run(msgId, chatJid, sender, content, timestamp, isFromMe ? 1 : 0);
|
|
||||||
logger.debug({ chatJid, msgId }, 'Message stored');
|
|
||||||
} catch (err) {
|
|
||||||
logger.error({ err, msgId }, 'Failed to store message');
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
function getNewMessages(): NewMessage[] {
|
|
||||||
const jids = Object.keys(registeredGroups);
|
|
||||||
if (jids.length === 0) return [];
|
|
||||||
|
|
||||||
const placeholders = jids.map(() => '?').join(',');
|
|
||||||
const sql = `
|
|
||||||
SELECT id, chat_jid, sender, content, timestamp
|
|
||||||
FROM messages
|
|
||||||
WHERE timestamp > ? AND chat_jid IN (${placeholders})
|
|
||||||
ORDER BY timestamp
|
|
||||||
`;
|
|
||||||
|
|
||||||
const rows = db.prepare(sql).all(lastTimestamp, ...jids) as NewMessage[];
|
|
||||||
for (const row of rows) {
|
|
||||||
if (row.timestamp > lastTimestamp) lastTimestamp = row.timestamp;
|
|
||||||
}
|
|
||||||
return rows;
|
|
||||||
}
|
|
||||||
|
|
||||||
async function processMessage(msg: NewMessage): Promise<void> {
|
async function processMessage(msg: NewMessage): Promise<void> {
|
||||||
const group = registeredGroups[msg.chat_jid];
|
const group = registeredGroups[msg.chat_jid];
|
||||||
if (!group) return;
|
if (!group) return;
|
||||||
@@ -281,7 +213,10 @@ async function startMessageLoop(): Promise<void> {
|
|||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
try {
|
try {
|
||||||
const messages = getNewMessages();
|
const jids = Object.keys(registeredGroups);
|
||||||
|
const { messages, newTimestamp } = getNewMessages(jids, lastTimestamp);
|
||||||
|
lastTimestamp = newTimestamp;
|
||||||
|
|
||||||
if (messages.length > 0) logger.info({ count: messages.length }, 'New messages');
|
if (messages.length > 0) logger.info({ count: messages.length }, 'New messages');
|
||||||
for (const msg of messages) await processMessage(msg);
|
for (const msg of messages) await processMessage(msg);
|
||||||
saveState();
|
saveState();
|
||||||
@@ -293,14 +228,14 @@ async function startMessageLoop(): Promise<void> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async function main(): Promise<void> {
|
async function main(): Promise<void> {
|
||||||
db = initDatabase(path.join(STORE_DIR, 'messages.db'));
|
initDatabase();
|
||||||
logger.info('Database initialized');
|
logger.info('Database initialized');
|
||||||
loadState();
|
loadState();
|
||||||
await connectWhatsApp();
|
await connectWhatsApp();
|
||||||
|
|
||||||
const shutdown = () => {
|
const shutdown = () => {
|
||||||
logger.info('Shutting down...');
|
logger.info('Shutting down...');
|
||||||
db.close();
|
closeDatabase();
|
||||||
process.exit(0);
|
process.exit(0);
|
||||||
};
|
};
|
||||||
process.on('SIGINT', shutdown);
|
process.on('SIGINT', shutdown);
|
||||||
|
|||||||
Reference in New Issue
Block a user