import { describe, it, expect } from "vitest"; import { EventQueue, type Event, type EventType } from "../../src/event-queue.js"; function makeEvent(type: EventType = "message", source = "test"): Omit { if (type === "message") { return { type, payload: { prompt: { text: "hello", channelId: "ch1", userId: "u1", guildId: null } }, source }; } if (type === "heartbeat") { return { type, payload: { instruction: "check email", checkName: "email-check" }, source }; } if (type === "cron") { return { type, payload: { instruction: "run report", jobName: "daily-report" }, source }; } return { type, payload: { hookType: "startup" as const }, source }; } describe("EventQueue", () => { it("enqueue assigns monotonically increasing IDs", () => { const q = new EventQueue(10); const e1 = q.enqueue(makeEvent()); const e2 = q.enqueue(makeEvent()); const e3 = q.enqueue(makeEvent()); expect(e1).not.toBeNull(); expect(e2).not.toBeNull(); expect(e3).not.toBeNull(); expect(e1!.id).toBe(1); expect(e2!.id).toBe(2); expect(e3!.id).toBe(3); }); it("enqueue assigns timestamps", () => { const q = new EventQueue(10); const e = q.enqueue(makeEvent()); expect(e).not.toBeNull(); expect(e!.timestamp).toBeInstanceOf(Date); }); it("returns null when queue is at max depth", () => { const q = new EventQueue(2); expect(q.enqueue(makeEvent())).not.toBeNull(); expect(q.enqueue(makeEvent())).not.toBeNull(); expect(q.enqueue(makeEvent())).toBeNull(); expect(q.size()).toBe(2); }); it("dequeue returns events in FIFO order", () => { const q = new EventQueue(10); q.enqueue(makeEvent("message")); q.enqueue(makeEvent("heartbeat")); q.enqueue(makeEvent("cron")); expect(q.dequeue()!.type).toBe("message"); expect(q.dequeue()!.type).toBe("heartbeat"); expect(q.dequeue()!.type).toBe("cron"); }); it("dequeue returns undefined when empty", () => { const q = new EventQueue(10); expect(q.dequeue()).toBeUndefined(); }); it("size returns current queue depth", () => { const q = new EventQueue(10); expect(q.size()).toBe(0); q.enqueue(makeEvent()); expect(q.size()).toBe(1); q.enqueue(makeEvent()); expect(q.size()).toBe(2); q.dequeue(); expect(q.size()).toBe(1); }); it("onEvent handler processes events sequentially", async () => { const q = new EventQueue(10); const processed: number[] = []; q.enqueue(makeEvent()); q.enqueue(makeEvent()); q.enqueue(makeEvent()); q.onEvent(async (event) => { processed.push(event.id); await new Promise((r) => setTimeout(r, 10)); }); await q.drain(); expect(processed).toEqual([1, 2, 3]); }); it("onEvent auto-processes newly enqueued events", async () => { const q = new EventQueue(10); const processed: number[] = []; q.onEvent(async (event) => { processed.push(event.id); }); q.enqueue(makeEvent()); q.enqueue(makeEvent()); await q.drain(); expect(processed).toEqual([1, 2]); }); it("drain resolves immediately when queue is empty and not processing", async () => { const q = new EventQueue(10); await q.drain(); // should not hang }); it("drain waits for in-flight processing to complete", async () => { const q = new EventQueue(10); let handlerDone = false; q.onEvent(async () => { await new Promise((r) => setTimeout(r, 50)); handlerDone = true; }); q.enqueue(makeEvent()); await q.drain(); expect(handlerDone).toBe(true); }); it("handler errors do not block subsequent processing", async () => { const q = new EventQueue(10); const processed: number[] = []; q.onEvent(async (event) => { if (event.id === 1) throw new Error("fail"); processed.push(event.id); }); q.enqueue(makeEvent()); q.enqueue(makeEvent()); await q.drain(); expect(processed).toEqual([2]); }); it("accepts all event types", () => { const q = new EventQueue(10); const types: EventType[] = ["message", "heartbeat", "cron", "hook", "webhook"]; for (const type of types) { const e = q.enqueue(makeEvent(type)); expect(e).not.toBeNull(); expect(e!.type).toBe(type); } }); });