112 lines
3.4 KiB
TypeScript
112 lines
3.4 KiB
TypeScript
import cron from "node-cron";
|
|
import type { Event } from "./event-queue.js";
|
|
import { logger } from "./logger.js";
|
|
|
|
export interface CronJob {
|
|
name: string;
|
|
expression: string;
|
|
instruction: string;
|
|
}
|
|
|
|
type EnqueueFn = (event: Omit<Event, "id" | "timestamp">) => Event | null;
|
|
|
|
export class CronScheduler {
|
|
private tasks: Map<string, cron.ScheduledTask> = new Map();
|
|
|
|
parseConfig(content: string): CronJob[] {
|
|
const jobs: CronJob[] = [];
|
|
const lines = content.split("\n");
|
|
|
|
// Find the "## Cron Jobs" section
|
|
let inCronSection = false;
|
|
let currentName: string | null = null;
|
|
let currentExpression: string | null = null;
|
|
let currentInstruction: string | null = null;
|
|
|
|
for (const line of lines) {
|
|
// Detect start of "## Cron Jobs" section
|
|
const h2Match = line.match(/^##\s+(.+)$/);
|
|
if (h2Match) {
|
|
if (inCronSection) {
|
|
// We hit another ## heading — check if it's a ### job or end of section
|
|
// A ## heading ends the Cron Jobs section
|
|
if (currentName !== null && currentExpression !== null && currentInstruction !== null) {
|
|
jobs.push({ name: currentName, expression: currentExpression, instruction: currentInstruction });
|
|
}
|
|
currentName = null;
|
|
currentExpression = null;
|
|
currentInstruction = null;
|
|
inCronSection = false;
|
|
}
|
|
if (h2Match[1].trim() === "Cron Jobs") {
|
|
inCronSection = true;
|
|
}
|
|
continue;
|
|
}
|
|
|
|
if (!inCronSection) continue;
|
|
|
|
// Detect ### job name headers within the Cron Jobs section
|
|
const h3Match = line.match(/^###\s+(.+)$/);
|
|
if (h3Match) {
|
|
// Push previous job if complete
|
|
if (currentName !== null && currentExpression !== null && currentInstruction !== null) {
|
|
jobs.push({ name: currentName, expression: currentExpression, instruction: currentInstruction });
|
|
}
|
|
currentName = h3Match[1].trim();
|
|
currentExpression = null;
|
|
currentInstruction = null;
|
|
continue;
|
|
}
|
|
|
|
const cronMatch = line.match(/^Cron:\s*(.+)$/);
|
|
if (cronMatch && currentName !== null) {
|
|
currentExpression = cronMatch[1].trim();
|
|
continue;
|
|
}
|
|
|
|
const instructionMatch = line.match(/^Instruction:\s*(.+)$/);
|
|
if (instructionMatch && currentName !== null) {
|
|
currentInstruction = instructionMatch[1].trim();
|
|
continue;
|
|
}
|
|
}
|
|
|
|
// Push the last job if complete
|
|
if (inCronSection && currentName !== null && currentExpression !== null && currentInstruction !== null) {
|
|
jobs.push({ name: currentName, expression: currentExpression, instruction: currentInstruction });
|
|
}
|
|
|
|
return jobs;
|
|
}
|
|
|
|
start(jobs: CronJob[], enqueue: EnqueueFn): void {
|
|
for (const job of jobs) {
|
|
if (!cron.validate(job.expression)) {
|
|
logger.warn({ name: job.name, expression: job.expression }, "Cron job has invalid cron expression, skipping");
|
|
continue;
|
|
}
|
|
|
|
const task = cron.schedule(job.expression, () => {
|
|
enqueue({
|
|
type: "cron",
|
|
payload: {
|
|
instruction: job.instruction,
|
|
jobName: job.name,
|
|
},
|
|
source: "cron-scheduler",
|
|
});
|
|
});
|
|
|
|
this.tasks.set(job.name, task);
|
|
}
|
|
}
|
|
|
|
stop(): void {
|
|
for (const task of this.tasks.values()) {
|
|
task.stop();
|
|
}
|
|
this.tasks.clear();
|
|
}
|
|
}
|