""" Aetheel Scheduler — SQLite Job Store ===================================== Persistent storage for scheduled jobs. Schema: jobs(id, cron_expr, prompt, channel_id, channel_type, created_at, next_run) """ import json import logging import os import sqlite3 import uuid from dataclasses import dataclass from datetime import datetime, timezone logger = logging.getLogger("aetheel.scheduler.store") @dataclass class ScheduledJob: """A persisted scheduled job.""" id: str cron_expr: str | None # None for one-shot jobs prompt: str channel_id: str channel_type: str # "slack", "telegram", etc. created_at: str next_run: str | None = None thread_id: str | None = None # for threading context user_name: str | None = None # who created the job @property def is_recurring(self) -> bool: return self.cron_expr is not None # --------------------------------------------------------------------------- # SQLite Store # --------------------------------------------------------------------------- CREATE_TABLE_SQL = """ CREATE TABLE IF NOT EXISTS jobs ( id TEXT PRIMARY KEY, cron_expr TEXT, prompt TEXT NOT NULL, channel_id TEXT NOT NULL, channel_type TEXT NOT NULL DEFAULT 'slack', thread_id TEXT, user_name TEXT, created_at TEXT NOT NULL, next_run TEXT ); """ class JobStore: """SQLite-backed persistent job store.""" def __init__(self, db_path: str | None = None): self._db_path = db_path or os.path.join( os.path.expanduser("~/.aetheel"), "scheduler.db" ) # Ensure directory exists os.makedirs(os.path.dirname(self._db_path), exist_ok=True) self._init_db() def _init_db(self) -> None: """Initialize the database schema.""" with sqlite3.connect(self._db_path) as conn: conn.execute(CREATE_TABLE_SQL) conn.commit() logger.info(f"Scheduler DB initialized: {self._db_path}") def _conn(self) -> sqlite3.Connection: conn = sqlite3.connect(self._db_path) conn.row_factory = sqlite3.Row return conn def add(self, job: ScheduledJob) -> str: """Add a job to the store. Returns the job ID.""" with self._conn() as conn: conn.execute( """ INSERT INTO jobs (id, cron_expr, prompt, channel_id, channel_type, thread_id, user_name, created_at, next_run) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( job.id, job.cron_expr, job.prompt, job.channel_id, job.channel_type, job.thread_id, job.user_name, job.created_at, job.next_run, ), ) conn.commit() logger.info(f"Job added: {job.id} (cron={job.cron_expr})") return job.id def remove(self, job_id: str) -> bool: """Remove a job by ID. Returns True if found and removed.""" with self._conn() as conn: cursor = conn.execute("DELETE FROM jobs WHERE id = ?", (job_id,)) conn.commit() removed = cursor.rowcount > 0 if removed: logger.info(f"Job removed: {job_id}") return removed def get(self, job_id: str) -> ScheduledJob | None: """Get a job by ID.""" with self._conn() as conn: row = conn.execute( "SELECT * FROM jobs WHERE id = ?", (job_id,) ).fetchone() return self._row_to_job(row) if row else None def list_all(self) -> list[ScheduledJob]: """List all jobs.""" with self._conn() as conn: rows = conn.execute( "SELECT * FROM jobs ORDER BY created_at" ).fetchall() return [self._row_to_job(row) for row in rows] def list_recurring(self) -> list[ScheduledJob]: """List only recurring (cron) jobs.""" with self._conn() as conn: rows = conn.execute( "SELECT * FROM jobs WHERE cron_expr IS NOT NULL ORDER BY created_at" ).fetchall() return [self._row_to_job(row) for row in rows] def clear_oneshot(self) -> int: """Remove all one-shot (non-cron) jobs. Returns count removed.""" with self._conn() as conn: cursor = conn.execute( "DELETE FROM jobs WHERE cron_expr IS NULL" ) conn.commit() return cursor.rowcount @staticmethod def _row_to_job(row: sqlite3.Row) -> ScheduledJob: return ScheduledJob( id=row["id"], cron_expr=row["cron_expr"], prompt=row["prompt"], channel_id=row["channel_id"], channel_type=row["channel_type"], thread_id=row["thread_id"], user_name=row["user_name"], created_at=row["created_at"], next_run=row["next_run"], ) @staticmethod def new_id() -> str: """Generate a short unique job ID.""" return uuid.uuid4().hex[:8]