Files
Aetheel/scheduler/scheduler.py
Tanmay Karande 41b2f9a593 latest updates
2026-02-15 15:02:58 -05:00

276 lines
7.9 KiB
Python

"""
Aetheel Scheduler
=================
APScheduler-based task scheduler with SQLite persistence.
Supports:
- One-shot delayed jobs (replaces threading.Timer reminders)
- Recurring cron jobs (cron expressions)
- Persistent storage (jobs survive restarts)
- Callback-based execution (fires handlers with job context)
Usage:
from scheduler import Scheduler
scheduler = Scheduler(callback=my_handler)
scheduler.start()
# One-shot reminder
scheduler.add_once(
delay_minutes=5,
prompt="Time to stretch!",
channel_id="C123",
channel_type="slack",
)
# Recurring cron job
scheduler.add_cron(
cron_expr="0 9 * * *",
prompt="Good morning! Here's your daily summary.",
channel_id="C123",
channel_type="slack",
)
"""
import logging
from datetime import datetime, timedelta, timezone
from typing import Callable
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.date import DateTrigger
from scheduler.store import JobStore, ScheduledJob
logger = logging.getLogger("aetheel.scheduler")
# Callback type: receives the ScheduledJob when it fires
JobCallback = Callable[[ScheduledJob], None]
class Scheduler:
"""
APScheduler-based task scheduler.
Wraps BackgroundScheduler with SQLite persistence. When a job fires,
it calls the registered callback with the ScheduledJob details. The
callback is responsible for routing the job's prompt to the AI handler
and sending the response to the right channel.
"""
def __init__(
self,
callback: JobCallback,
db_path: str | None = None,
):
self._callback = callback
self._store = JobStore(db_path=db_path)
self._scheduler = BackgroundScheduler(
daemon=True,
job_defaults={"misfire_grace_time": 60},
)
self._running = False
@property
def running(self) -> bool:
return self._running
def start(self) -> None:
"""Start the scheduler and restore persisted jobs."""
if self._running:
return
self._scheduler.start()
self._running = True
# Restore recurring jobs from the database
restored = 0
for job in self._store.list_recurring():
try:
self._register_cron_job(job)
restored += 1
except Exception as e:
logger.warning(f"Failed to restore job {job.id}: {e}")
logger.info(f"Scheduler started (restored {restored} recurring jobs)")
def stop(self) -> None:
"""Shut down the scheduler."""
if self._running:
self._scheduler.shutdown(wait=False)
self._running = False
logger.info("Scheduler stopped")
# -------------------------------------------------------------------
# Public API: Add jobs
# -------------------------------------------------------------------
def add_once(
self,
*,
delay_minutes: int,
prompt: str,
channel_id: str,
channel_type: str = "slack",
thread_id: str | None = None,
user_name: str | None = None,
) -> str:
"""
Schedule a one-shot job to fire after a delay.
Replaces the old threading.Timer-based _schedule_reminder().
Returns the job ID.
"""
job_id = JobStore.new_id()
run_at = datetime.now(timezone.utc) + timedelta(minutes=delay_minutes)
job = ScheduledJob(
id=job_id,
cron_expr=None,
prompt=prompt,
channel_id=channel_id,
channel_type=channel_type,
thread_id=thread_id,
user_name=user_name,
created_at=datetime.now(timezone.utc).isoformat(),
next_run=run_at.isoformat(),
)
# Persist
self._store.add(job)
# Schedule
self._scheduler.add_job(
self._fire_job,
trigger=DateTrigger(run_date=run_at),
args=[job_id],
id=f"once-{job_id}",
)
logger.info(
f"⏰ One-shot scheduled: '{prompt[:50]}' in {delay_minutes} min "
f"(id={job_id}, channel={channel_type}/{channel_id})"
)
return job_id
def add_cron(
self,
*,
cron_expr: str,
prompt: str,
channel_id: str,
channel_type: str = "slack",
thread_id: str | None = None,
user_name: str | None = None,
) -> str:
"""
Schedule a recurring cron job.
Args:
cron_expr: Standard cron expression (5 fields: min hour day month weekday)
Returns the job ID.
"""
job_id = JobStore.new_id()
job = ScheduledJob(
id=job_id,
cron_expr=cron_expr,
prompt=prompt,
channel_id=channel_id,
channel_type=channel_type,
thread_id=thread_id,
user_name=user_name,
created_at=datetime.now(timezone.utc).isoformat(),
)
# Persist
self._store.add(job)
# Register with APScheduler
self._register_cron_job(job)
logger.info(
f"🔄 Cron scheduled: '{prompt[:50]}' ({cron_expr}) "
f"(id={job_id}, channel={channel_type}/{channel_id})"
)
return job_id
# -------------------------------------------------------------------
# Public API: Manage jobs
# -------------------------------------------------------------------
def remove(self, job_id: str) -> bool:
"""Remove a job by ID. Returns True if found and removed."""
# Remove from APScheduler
for prefix in ("once-", "cron-"):
try:
self._scheduler.remove_job(f"{prefix}{job_id}")
except Exception:
pass
# Remove from store
return self._store.remove(job_id)
def list_jobs(self) -> list[ScheduledJob]:
"""List all scheduled jobs."""
return self._store.list_all()
def list_recurring(self) -> list[ScheduledJob]:
"""List only recurring cron jobs."""
return self._store.list_recurring()
# -------------------------------------------------------------------
# Internal
# -------------------------------------------------------------------
def _register_cron_job(self, job: ScheduledJob) -> None:
"""Register a cron job with APScheduler."""
if not job.cron_expr:
return
# Parse cron expression (5 fields: min hour day month weekday)
parts = job.cron_expr.strip().split()
if len(parts) != 5:
raise ValueError(
f"Invalid cron expression: '{job.cron_expr}'"
"expected 5 fields (minute hour day month weekday)"
)
trigger = CronTrigger(
minute=parts[0],
hour=parts[1],
day=parts[2],
month=parts[3],
day_of_week=parts[4],
)
self._scheduler.add_job(
self._fire_job,
trigger=trigger,
args=[job.id],
id=f"cron-{job.id}",
replace_existing=True,
)
def _fire_job(self, job_id: str) -> None:
"""Called by APScheduler when a job triggers."""
job = self._store.get(job_id)
if not job:
logger.warning(f"Job {job_id} not found in store — skipping")
return
logger.info(
f"🔔 Job fired: {job_id} (cron={job.cron_expr}, "
f"prompt='{job.prompt[:50]}')"
)
try:
self._callback(job)
except Exception as e:
logger.error(f"Job callback failed for {job_id}: {e}", exc_info=True)
# Clean up one-shot jobs after firing
if not job.is_recurring:
self._store.remove(job_id)