Files
aetheel-2/tests/unit/channel-queue.test.ts

130 lines
3.4 KiB
TypeScript

import { describe, it, expect, beforeEach } from "vitest";
import { ChannelQueue } from "../../src/channel-queue.js";
describe("ChannelQueue", () => {
let queue: ChannelQueue;
beforeEach(() => {
queue = new ChannelQueue();
});
it("executes a single task immediately", async () => {
let executed = false;
await queue.enqueue("ch-1", async () => { executed = true; });
expect(executed).toBe(true);
});
it("enqueue resolves when the task completes", async () => {
const order: number[] = [];
const p = queue.enqueue("ch-1", async () => {
await delay(20);
order.push(1);
});
await p;
expect(order).toEqual([1]);
});
it("executes tasks for the same channel sequentially in FIFO order", async () => {
const order: number[] = [];
const p1 = queue.enqueue("ch-1", async () => {
await delay(30);
order.push(1);
});
const p2 = queue.enqueue("ch-1", async () => {
await delay(10);
order.push(2);
});
const p3 = queue.enqueue("ch-1", async () => {
order.push(3);
});
await Promise.all([p1, p2, p3]);
expect(order).toEqual([1, 2, 3]);
});
it("executes tasks for different channels concurrently", async () => {
const order: string[] = [];
const p1 = queue.enqueue("ch-1", async () => {
await delay(40);
order.push("ch-1");
});
const p2 = queue.enqueue("ch-2", async () => {
await delay(10);
order.push("ch-2");
});
await Promise.all([p1, p2]);
// ch-2 should finish first since it has a shorter delay
expect(order).toEqual(["ch-2", "ch-1"]);
});
it("no concurrent execution within the same channel", async () => {
let concurrent = 0;
let maxConcurrent = 0;
const makeTask = () => async () => {
concurrent++;
maxConcurrent = Math.max(maxConcurrent, concurrent);
await delay(10);
concurrent--;
};
const promises = [
queue.enqueue("ch-1", makeTask()),
queue.enqueue("ch-1", makeTask()),
queue.enqueue("ch-1", makeTask()),
];
await Promise.all(promises);
expect(maxConcurrent).toBe(1);
});
it("propagates task errors to the enqueue caller", async () => {
await expect(
queue.enqueue("ch-1", async () => { throw new Error("boom"); })
).rejects.toThrow("boom");
});
it("continues processing after a task error", async () => {
let secondRan = false;
const p1 = queue.enqueue("ch-1", async () => { throw new Error("fail"); }).catch(() => {});
const p2 = queue.enqueue("ch-1", async () => { secondRan = true; });
await Promise.all([p1, p2]);
expect(secondRan).toBe(true);
});
it("drainAll resolves immediately when no tasks are queued", async () => {
await queue.drainAll();
});
it("drainAll waits for all in-flight and queued tasks", async () => {
const order: string[] = [];
queue.enqueue("ch-1", async () => {
await delay(20);
order.push("ch-1-a");
});
queue.enqueue("ch-1", async () => {
order.push("ch-1-b");
});
queue.enqueue("ch-2", async () => {
await delay(10);
order.push("ch-2-a");
});
await queue.drainAll();
expect(order).toContain("ch-1-a");
expect(order).toContain("ch-1-b");
expect(order).toContain("ch-2-a");
expect(order.length).toBe(3);
});
});
function delay(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}