130 lines
3.4 KiB
TypeScript
130 lines
3.4 KiB
TypeScript
import { describe, it, expect, beforeEach } from "vitest";
|
|
import { ChannelQueue } from "../../src/channel-queue.js";
|
|
|
|
describe("ChannelQueue", () => {
|
|
let queue: ChannelQueue;
|
|
|
|
beforeEach(() => {
|
|
queue = new ChannelQueue();
|
|
});
|
|
|
|
it("executes a single task immediately", async () => {
|
|
let executed = false;
|
|
await queue.enqueue("ch-1", async () => { executed = true; });
|
|
expect(executed).toBe(true);
|
|
});
|
|
|
|
it("enqueue resolves when the task completes", async () => {
|
|
const order: number[] = [];
|
|
const p = queue.enqueue("ch-1", async () => {
|
|
await delay(20);
|
|
order.push(1);
|
|
});
|
|
await p;
|
|
expect(order).toEqual([1]);
|
|
});
|
|
|
|
it("executes tasks for the same channel sequentially in FIFO order", async () => {
|
|
const order: number[] = [];
|
|
|
|
const p1 = queue.enqueue("ch-1", async () => {
|
|
await delay(30);
|
|
order.push(1);
|
|
});
|
|
const p2 = queue.enqueue("ch-1", async () => {
|
|
await delay(10);
|
|
order.push(2);
|
|
});
|
|
const p3 = queue.enqueue("ch-1", async () => {
|
|
order.push(3);
|
|
});
|
|
|
|
await Promise.all([p1, p2, p3]);
|
|
expect(order).toEqual([1, 2, 3]);
|
|
});
|
|
|
|
it("executes tasks for different channels concurrently", async () => {
|
|
const order: string[] = [];
|
|
|
|
const p1 = queue.enqueue("ch-1", async () => {
|
|
await delay(40);
|
|
order.push("ch-1");
|
|
});
|
|
const p2 = queue.enqueue("ch-2", async () => {
|
|
await delay(10);
|
|
order.push("ch-2");
|
|
});
|
|
|
|
await Promise.all([p1, p2]);
|
|
// ch-2 should finish first since it has a shorter delay
|
|
expect(order).toEqual(["ch-2", "ch-1"]);
|
|
});
|
|
|
|
it("no concurrent execution within the same channel", async () => {
|
|
let concurrent = 0;
|
|
let maxConcurrent = 0;
|
|
|
|
const makeTask = () => async () => {
|
|
concurrent++;
|
|
maxConcurrent = Math.max(maxConcurrent, concurrent);
|
|
await delay(10);
|
|
concurrent--;
|
|
};
|
|
|
|
const promises = [
|
|
queue.enqueue("ch-1", makeTask()),
|
|
queue.enqueue("ch-1", makeTask()),
|
|
queue.enqueue("ch-1", makeTask()),
|
|
];
|
|
|
|
await Promise.all(promises);
|
|
expect(maxConcurrent).toBe(1);
|
|
});
|
|
|
|
it("propagates task errors to the enqueue caller", async () => {
|
|
await expect(
|
|
queue.enqueue("ch-1", async () => { throw new Error("boom"); })
|
|
).rejects.toThrow("boom");
|
|
});
|
|
|
|
it("continues processing after a task error", async () => {
|
|
let secondRan = false;
|
|
|
|
const p1 = queue.enqueue("ch-1", async () => { throw new Error("fail"); }).catch(() => {});
|
|
const p2 = queue.enqueue("ch-1", async () => { secondRan = true; });
|
|
|
|
await Promise.all([p1, p2]);
|
|
expect(secondRan).toBe(true);
|
|
});
|
|
|
|
it("drainAll resolves immediately when no tasks are queued", async () => {
|
|
await queue.drainAll();
|
|
});
|
|
|
|
it("drainAll waits for all in-flight and queued tasks", async () => {
|
|
const order: string[] = [];
|
|
|
|
queue.enqueue("ch-1", async () => {
|
|
await delay(20);
|
|
order.push("ch-1-a");
|
|
});
|
|
queue.enqueue("ch-1", async () => {
|
|
order.push("ch-1-b");
|
|
});
|
|
queue.enqueue("ch-2", async () => {
|
|
await delay(10);
|
|
order.push("ch-2-a");
|
|
});
|
|
|
|
await queue.drainAll();
|
|
expect(order).toContain("ch-1-a");
|
|
expect(order).toContain("ch-1-b");
|
|
expect(order).toContain("ch-2-a");
|
|
expect(order.length).toBe(3);
|
|
});
|
|
});
|
|
|
|
function delay(ms: number): Promise<void> {
|
|
return new Promise((resolve) => setTimeout(resolve, ms));
|
|
}
|