Files
Aetheel/heartbeat/heartbeat.py
tanmay11k f7ccc153b4 feat: openclaw-style secrets (env.vars + \) and per-task model routing
- Replace python-dotenv with config.json env.vars block + \ substitution
- Add models section for per-task model routing (heartbeat, subagent, default)
- Heartbeat/subagent tasks can use different models/providers than main chat
- Remove python-dotenv from dependencies
- Update all docs to reflect new config approach
- Reorganize docs into project/ and research/ subdirectories
2026-02-21 00:40:29 -05:00

211 lines
6.5 KiB
Python

"""
Aetheel Heartbeat System
========================
Parses HEARTBEAT.md and registers periodic tasks with the Scheduler.
Each section in HEARTBEAT.md defines a schedule (natural language header)
and one or more task prompts (bullet points). The HeartbeatRunner converts
these into cron jobs that fire synthetic messages through the AI handler.
"""
import logging
import os
import re
from dataclasses import dataclass
from config import HeartbeatConfig
logger = logging.getLogger("aetheel.heartbeat")
@dataclass
class HeartbeatTask:
cron_expr: str
prompt: str
section_name: str
DEFAULT_HEARTBEAT_MD = """\
# Heartbeat Tasks
## Every 30 minutes
- Check if any scheduled reminders need attention
## Every morning (9:00 AM)
- Summarize yesterday's conversations
## Every evening (6:00 PM)
- Update MEMORY.md with today's key learnings
"""
class HeartbeatRunner:
"""Parses HEARTBEAT.md and registers tasks with the Scheduler."""
def __init__(
self,
scheduler,
ai_handler_fn,
send_fn,
config: HeartbeatConfig,
workspace_dir: str,
):
self._scheduler = scheduler
self._ai_handler = ai_handler_fn
self._send_fn = send_fn
self._config = config
self._workspace_dir = workspace_dir
self._heartbeat_path = os.path.join(workspace_dir, "HEARTBEAT.md")
def start(self) -> int:
"""Parse HEARTBEAT.md and register tasks. Returns count registered."""
if not self._config.enabled:
return 0
# Clear previous heartbeat jobs to avoid duplicates on restart
removed = self._scheduler.remove_by_channel_type("heartbeat")
if removed:
logger.info(f"Heartbeat: cleared {removed} stale job(s) from previous run")
self._ensure_heartbeat_file()
tasks = self._parse_heartbeat_md()
for task in tasks:
self._scheduler.add_cron(
cron_expr=task.cron_expr,
prompt=task.prompt,
channel_id=self._config.default_channel_id,
channel_type="heartbeat",
)
logger.info(f"Heartbeat started: {len(tasks)} task(s) registered")
return len(tasks)
def _parse_heartbeat_md(self) -> list[HeartbeatTask]:
"""Parse HEARTBEAT.md sections into HeartbeatTask objects."""
try:
with open(self._heartbeat_path, "r", encoding="utf-8") as f:
content = f.read()
except FileNotFoundError:
logger.warning(f"HEARTBEAT.md not found at {self._heartbeat_path}")
return []
except Exception as e:
logger.warning(f"Failed to read HEARTBEAT.md: {e}")
return []
tasks: list[HeartbeatTask] = []
current_header: str | None = None
current_cron: str | None = None
for line in content.splitlines():
stripped = line.strip()
# Match ## section headers (schedule definitions)
if stripped.startswith("## "):
header_text = stripped[3:].strip()
cron = self._parse_schedule_header(header_text)
if cron is not None:
current_header = header_text
current_cron = cron
else:
logger.warning(
f"Unrecognized schedule header: '{header_text}' — skipping section"
)
current_header = None
current_cron = None
continue
# Match bullet points under a valid section
if current_cron is not None and stripped.startswith("- "):
prompt = stripped[2:].strip()
if prompt:
tasks.append(
HeartbeatTask(
cron_expr=current_cron,
prompt=prompt,
section_name=current_header or "",
)
)
return tasks
def _ensure_heartbeat_file(self) -> None:
"""Create default HEARTBEAT.md if it doesn't exist."""
if os.path.exists(self._heartbeat_path):
return
os.makedirs(os.path.dirname(self._heartbeat_path), exist_ok=True)
with open(self._heartbeat_path, "w", encoding="utf-8") as f:
f.write(DEFAULT_HEARTBEAT_MD)
logger.info(f"Created default HEARTBEAT.md at {self._heartbeat_path}")
@staticmethod
def _parse_schedule_header(header: str) -> str | None:
"""Convert a natural language schedule header to a cron expression.
Supported patterns:
"Every 30 minutes" -> */30 * * * *
"Every N minutes" -> */N * * * *
"Every hour" -> 0 * * * *
"Every N hours" -> 0 */N * * *
"Every morning (9:00 AM)" -> 0 9 * * *
"Every evening (6:00 PM)" -> 0 18 * * *
Returns None for unrecognized patterns.
"""
h = header.strip()
# "Every hour"
if re.match(r"^every\s+hour$", h, re.IGNORECASE):
return "0 * * * *"
# "Every N minutes"
m = re.match(r"^every\s+(\d+)\s+minutes?$", h, re.IGNORECASE)
if m:
n = int(m.group(1))
return f"*/{n} * * * *"
# "Every N hours"
m = re.match(r"^every\s+(\d+)\s+hours?$", h, re.IGNORECASE)
if m:
n = int(m.group(1))
return f"0 */{n} * * *"
# "Every morning (H:MM AM)" or "Every morning (H AM)"
m = re.match(
r"^every\s+morning\s*\(\s*(\d{1,2})(?::(\d{2}))?\s*(AM|PM)\s*\)$",
h,
re.IGNORECASE,
)
if m:
hour = int(m.group(1))
minute = int(m.group(2)) if m.group(2) else 0
period = m.group(3).upper()
hour = _to_24h(hour, period)
return f"{minute} {hour} * * *"
# "Every evening (H:MM PM)" or "Every evening (H PM)"
m = re.match(
r"^every\s+evening\s*\(\s*(\d{1,2})(?::(\d{2}))?\s*(AM|PM)\s*\)$",
h,
re.IGNORECASE,
)
if m:
hour = int(m.group(1))
minute = int(m.group(2)) if m.group(2) else 0
period = m.group(3).upper()
hour = _to_24h(hour, period)
return f"{minute} {hour} * * *"
return None
def _to_24h(hour: int, period: str) -> int:
"""Convert 12-hour time to 24-hour."""
if period == "AM":
return 0 if hour == 12 else hour
else: # PM
return hour if hour == 12 else hour + 12