import { describe, it, expect, beforeEach } from "vitest"; import { ChannelQueue } from "../../src/channel-queue.js"; describe("ChannelQueue", () => { let queue: ChannelQueue; beforeEach(() => { queue = new ChannelQueue(); }); it("executes a single task immediately", async () => { let executed = false; await queue.enqueue("ch-1", async () => { executed = true; }); expect(executed).toBe(true); }); it("enqueue resolves when the task completes", async () => { const order: number[] = []; const p = queue.enqueue("ch-1", async () => { await delay(20); order.push(1); }); await p; expect(order).toEqual([1]); }); it("executes tasks for the same channel sequentially in FIFO order", async () => { const order: number[] = []; const p1 = queue.enqueue("ch-1", async () => { await delay(30); order.push(1); }); const p2 = queue.enqueue("ch-1", async () => { await delay(10); order.push(2); }); const p3 = queue.enqueue("ch-1", async () => { order.push(3); }); await Promise.all([p1, p2, p3]); expect(order).toEqual([1, 2, 3]); }); it("executes tasks for different channels concurrently", async () => { const order: string[] = []; const p1 = queue.enqueue("ch-1", async () => { await delay(40); order.push("ch-1"); }); const p2 = queue.enqueue("ch-2", async () => { await delay(10); order.push("ch-2"); }); await Promise.all([p1, p2]); // ch-2 should finish first since it has a shorter delay expect(order).toEqual(["ch-2", "ch-1"]); }); it("no concurrent execution within the same channel", async () => { let concurrent = 0; let maxConcurrent = 0; const makeTask = () => async () => { concurrent++; maxConcurrent = Math.max(maxConcurrent, concurrent); await delay(10); concurrent--; }; const promises = [ queue.enqueue("ch-1", makeTask()), queue.enqueue("ch-1", makeTask()), queue.enqueue("ch-1", makeTask()), ]; await Promise.all(promises); expect(maxConcurrent).toBe(1); }); it("propagates task errors to the enqueue caller", async () => { await expect( queue.enqueue("ch-1", async () => { throw new Error("boom"); }) ).rejects.toThrow("boom"); }); it("continues processing after a task error", async () => { let secondRan = false; const p1 = queue.enqueue("ch-1", async () => { throw new Error("fail"); }).catch(() => {}); const p2 = queue.enqueue("ch-1", async () => { secondRan = true; }); await Promise.all([p1, p2]); expect(secondRan).toBe(true); }); it("drainAll resolves immediately when no tasks are queued", async () => { await queue.drainAll(); }); it("drainAll waits for all in-flight and queued tasks", async () => { const order: string[] = []; queue.enqueue("ch-1", async () => { await delay(20); order.push("ch-1-a"); }); queue.enqueue("ch-1", async () => { order.push("ch-1-b"); }); queue.enqueue("ch-2", async () => { await delay(10); order.push("ch-2-a"); }); await queue.drainAll(); expect(order).toContain("ch-1-a"); expect(order).toContain("ch-1-b"); expect(order).toContain("ch-2-a"); expect(order.length).toBe(3); }); }); function delay(ms: number): Promise { return new Promise((resolve) => setTimeout(resolve, ms)); }