latest updates
This commit is contained in:
140
tests/test_scheduler.py
Normal file
140
tests/test_scheduler.py
Normal file
@@ -0,0 +1,140 @@
|
||||
"""
|
||||
Tests for the Scheduler Store (SQLite persistence).
|
||||
"""
|
||||
|
||||
import os
|
||||
import tempfile
|
||||
from datetime import datetime, timezone
|
||||
|
||||
import pytest
|
||||
|
||||
from scheduler.store import JobStore, ScheduledJob
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Fixtures
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def db_path(tmp_path):
|
||||
"""Provide a temp database path."""
|
||||
return str(tmp_path / "test_scheduler.db")
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def store(db_path):
|
||||
"""Create a fresh JobStore."""
|
||||
return JobStore(db_path=db_path)
|
||||
|
||||
|
||||
def _make_job(
|
||||
cron_expr: str | None = "*/5 * * * *",
|
||||
prompt: str = "Test prompt",
|
||||
channel_id: str = "C123",
|
||||
channel_type: str = "slack",
|
||||
) -> ScheduledJob:
|
||||
"""Helper to create a ScheduledJob."""
|
||||
return ScheduledJob(
|
||||
id=JobStore.new_id(),
|
||||
cron_expr=cron_expr,
|
||||
prompt=prompt,
|
||||
channel_id=channel_id,
|
||||
channel_type=channel_type,
|
||||
created_at=datetime.now(timezone.utc).isoformat(),
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tests: JobStore
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestJobStore:
|
||||
def test_add_and_get(self, store):
|
||||
job = _make_job(prompt="Hello scheduler")
|
||||
store.add(job)
|
||||
retrieved = store.get(job.id)
|
||||
assert retrieved is not None
|
||||
assert retrieved.id == job.id
|
||||
assert retrieved.prompt == "Hello scheduler"
|
||||
assert retrieved.cron_expr == "*/5 * * * *"
|
||||
|
||||
def test_remove(self, store):
|
||||
job = _make_job()
|
||||
store.add(job)
|
||||
assert store.remove(job.id) is True
|
||||
assert store.get(job.id) is None
|
||||
|
||||
def test_remove_nonexistent(self, store):
|
||||
assert store.remove("nonexistent") is False
|
||||
|
||||
def test_list_all(self, store):
|
||||
job1 = _make_job(prompt="Job 1")
|
||||
job2 = _make_job(prompt="Job 2")
|
||||
store.add(job1)
|
||||
store.add(job2)
|
||||
all_jobs = store.list_all()
|
||||
assert len(all_jobs) == 2
|
||||
|
||||
def test_list_recurring(self, store):
|
||||
cron_job = _make_job(cron_expr="0 9 * * *", prompt="Recurring")
|
||||
oneshot_job = _make_job(cron_expr=None, prompt="One-shot")
|
||||
store.add(cron_job)
|
||||
store.add(oneshot_job)
|
||||
recurring = store.list_recurring()
|
||||
assert len(recurring) == 1
|
||||
assert recurring[0].prompt == "Recurring"
|
||||
|
||||
def test_clear_oneshot(self, store):
|
||||
cron_job = _make_job(cron_expr="0 9 * * *")
|
||||
oneshot1 = _make_job(cron_expr=None, prompt="OS 1")
|
||||
oneshot2 = _make_job(cron_expr=None, prompt="OS 2")
|
||||
store.add(cron_job)
|
||||
store.add(oneshot1)
|
||||
store.add(oneshot2)
|
||||
removed = store.clear_oneshot()
|
||||
assert removed == 2
|
||||
remaining = store.list_all()
|
||||
assert len(remaining) == 1
|
||||
assert remaining[0].is_recurring
|
||||
|
||||
def test_is_recurring(self):
|
||||
cron = _make_job(cron_expr="*/5 * * * *")
|
||||
oneshot = _make_job(cron_expr=None)
|
||||
assert cron.is_recurring is True
|
||||
assert oneshot.is_recurring is False
|
||||
|
||||
def test_persistence(self, db_path):
|
||||
"""Jobs survive store re-creation."""
|
||||
store1 = JobStore(db_path=db_path)
|
||||
job = _make_job(prompt="Persistent job")
|
||||
store1.add(job)
|
||||
|
||||
# Create a new store instance pointing to same DB
|
||||
store2 = JobStore(db_path=db_path)
|
||||
retrieved = store2.get(job.id)
|
||||
assert retrieved is not None
|
||||
assert retrieved.prompt == "Persistent job"
|
||||
|
||||
def test_new_id_unique(self):
|
||||
ids = {JobStore.new_id() for _ in range(100)}
|
||||
assert len(ids) == 100 # all unique
|
||||
|
||||
def test_job_with_metadata(self, store):
|
||||
job = _make_job()
|
||||
job.thread_id = "T456"
|
||||
job.user_name = "Test User"
|
||||
store.add(job)
|
||||
retrieved = store.get(job.id)
|
||||
assert retrieved is not None
|
||||
assert retrieved.thread_id == "T456"
|
||||
assert retrieved.user_name == "Test User"
|
||||
|
||||
def test_telegram_channel_type(self, store):
|
||||
job = _make_job(channel_type="telegram", channel_id="987654")
|
||||
store.add(job)
|
||||
retrieved = store.get(job.id)
|
||||
assert retrieved is not None
|
||||
assert retrieved.channel_type == "telegram"
|
||||
assert retrieved.channel_id == "987654"
|
||||
Reference in New Issue
Block a user