""" Aetheel Scheduler ================= APScheduler-based task scheduler with SQLite persistence. Supports: - One-shot delayed jobs (replaces threading.Timer reminders) - Recurring cron jobs (cron expressions) - Persistent storage (jobs survive restarts) - Callback-based execution (fires handlers with job context) Usage: from scheduler import Scheduler scheduler = Scheduler(callback=my_handler) scheduler.start() # One-shot reminder scheduler.add_once( delay_minutes=5, prompt="Time to stretch!", channel_id="C123", channel_type="slack", ) # Recurring cron job scheduler.add_cron( cron_expr="0 9 * * *", prompt="Good morning! Here's your daily summary.", channel_id="C123", channel_type="slack", ) """ import logging from datetime import datetime, timedelta, timezone from typing import Callable from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.cron import CronTrigger from apscheduler.triggers.date import DateTrigger from scheduler.store import JobStore, ScheduledJob logger = logging.getLogger("aetheel.scheduler") # Callback type: receives the ScheduledJob when it fires JobCallback = Callable[[ScheduledJob], None] class Scheduler: """ APScheduler-based task scheduler. Wraps BackgroundScheduler with SQLite persistence. When a job fires, it calls the registered callback with the ScheduledJob details. The callback is responsible for routing the job's prompt to the AI handler and sending the response to the right channel. """ def __init__( self, callback: JobCallback, db_path: str | None = None, ): self._callback = callback self._store = JobStore(db_path=db_path) self._scheduler = BackgroundScheduler( daemon=True, job_defaults={"misfire_grace_time": 60}, ) self._running = False @property def running(self) -> bool: return self._running def start(self) -> None: """Start the scheduler and restore persisted jobs.""" if self._running: return self._scheduler.start() self._running = True # Restore recurring jobs from the database restored = 0 for job in self._store.list_recurring(): try: self._register_cron_job(job) restored += 1 except Exception as e: logger.warning(f"Failed to restore job {job.id}: {e}") logger.info(f"Scheduler started (restored {restored} recurring jobs)") def stop(self) -> None: """Shut down the scheduler.""" if self._running: self._scheduler.shutdown(wait=False) self._running = False logger.info("Scheduler stopped") # ------------------------------------------------------------------- # Public API: Add jobs # ------------------------------------------------------------------- def add_once( self, *, delay_minutes: int, prompt: str, channel_id: str, channel_type: str = "slack", thread_id: str | None = None, user_name: str | None = None, ) -> str: """ Schedule a one-shot job to fire after a delay. Replaces the old threading.Timer-based _schedule_reminder(). Returns the job ID. """ job_id = JobStore.new_id() run_at = datetime.now(timezone.utc) + timedelta(minutes=delay_minutes) job = ScheduledJob( id=job_id, cron_expr=None, prompt=prompt, channel_id=channel_id, channel_type=channel_type, thread_id=thread_id, user_name=user_name, created_at=datetime.now(timezone.utc).isoformat(), next_run=run_at.isoformat(), ) # Persist self._store.add(job) # Schedule self._scheduler.add_job( self._fire_job, trigger=DateTrigger(run_date=run_at), args=[job_id], id=f"once-{job_id}", ) logger.info( f"⏰ One-shot scheduled: '{prompt[:50]}' in {delay_minutes} min " f"(id={job_id}, channel={channel_type}/{channel_id})" ) return job_id def add_cron( self, *, cron_expr: str, prompt: str, channel_id: str, channel_type: str = "slack", thread_id: str | None = None, user_name: str | None = None, ) -> str: """ Schedule a recurring cron job. Args: cron_expr: Standard cron expression (5 fields: min hour day month weekday) Returns the job ID. """ job_id = JobStore.new_id() job = ScheduledJob( id=job_id, cron_expr=cron_expr, prompt=prompt, channel_id=channel_id, channel_type=channel_type, thread_id=thread_id, user_name=user_name, created_at=datetime.now(timezone.utc).isoformat(), ) # Persist self._store.add(job) # Register with APScheduler self._register_cron_job(job) logger.info( f"🔄 Cron scheduled: '{prompt[:50]}' ({cron_expr}) " f"(id={job_id}, channel={channel_type}/{channel_id})" ) return job_id # ------------------------------------------------------------------- # Public API: Manage jobs # ------------------------------------------------------------------- def remove(self, job_id: str) -> bool: """Remove a job by ID. Returns True if found and removed.""" # Remove from APScheduler for prefix in ("once-", "cron-"): try: self._scheduler.remove_job(f"{prefix}{job_id}") except Exception: pass # Remove from store return self._store.remove(job_id) def list_jobs(self) -> list[ScheduledJob]: """List all scheduled jobs.""" return self._store.list_all() def list_recurring(self) -> list[ScheduledJob]: """List only recurring cron jobs.""" return self._store.list_recurring() def remove_by_channel_type(self, channel_type: str) -> int: """Remove all jobs with the given channel_type. Returns count removed.""" ids = self._store.remove_by_channel_type(channel_type) for job_id in ids: for prefix in ("once-", "cron-"): try: self._scheduler.remove_job(f"{prefix}{job_id}") except Exception: pass return len(ids) # ------------------------------------------------------------------- # Internal # ------------------------------------------------------------------- def _register_cron_job(self, job: ScheduledJob) -> None: """Register a cron job with APScheduler.""" if not job.cron_expr: return # Parse cron expression (5 fields: min hour day month weekday) parts = job.cron_expr.strip().split() if len(parts) != 5: raise ValueError( f"Invalid cron expression: '{job.cron_expr}' — " "expected 5 fields (minute hour day month weekday)" ) trigger = CronTrigger( minute=parts[0], hour=parts[1], day=parts[2], month=parts[3], day_of_week=parts[4], ) self._scheduler.add_job( self._fire_job, trigger=trigger, args=[job.id], id=f"cron-{job.id}", replace_existing=True, ) def _fire_job(self, job_id: str) -> None: """Called by APScheduler when a job triggers.""" job = self._store.get(job_id) if not job: logger.warning(f"Job {job_id} not found in store — skipping") return logger.info( f"🔔 Job fired: {job_id} (cron={job.cron_expr}, " f"prompt='{job.prompt[:50]}')" ) try: self._callback(job) except Exception as e: logger.error(f"Job callback failed for {job_id}: {e}", exc_info=True) # Clean up one-shot jobs after firing if not job.is_recurring: self._store.remove(job_id)