Adds Agent Swarms
* feat: streaming container mode, IPC messaging, agent teams support
Major architectural shift from single-shot container runs to long-lived
streaming containers with IPC-based message injection.
- Agent runner: query loop with AsyncIterable prompt to keep stdin open
for agent teams (fixes isSingleUserTurn premature shutdown)
- New standalone stdio MCP server (ipc-mcp-stdio.ts) inheritable by
subagents, with send_message and schedule_task tools
- Streaming output: parse OUTPUT_START/END markers in real-time, send
results to WhatsApp as they arrive
- IPC file-based messaging: host writes to ipc/{group}/input/, agent
polls for follow-up messages without respawning containers
- Per-group settings.json with CLAUDE_CODE_EXPERIMENTAL_AGENT_TEAMS=1
- SDK bumped to 0.2.34 for TeamCreate tool support
- Container idle timeout (30min) with _close sentinel for shutdown
- Orphaned container cleanup on startup
- alwaysRespond flag for groups that skip trigger pattern check
- Uncaught exception/rejection handlers with timestamps in logger
- Combined SDK documentation into single deep dive reference
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
* chore: remove unused ipc-mcp.ts (replaced by ipc-mcp-stdio.ts)
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
* fix: clarify agent communication model in docs and tool descriptions
- CLAUDE.md (main + global): split communication instructions into
"responding to messages" vs "scheduled tasks" sections
- send_message tool: note that scheduled task output is not sent to user
- Remove structured output (outputFormat) — not needed with current flow
- Regular output is sent to WhatsApp; scheduled task output is only logged
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
* chore: ignore dynamic group data while preserving base structure
Only track groups/main/CLAUDE.md and groups/global/CLAUDE.md. All other
group directories and files are ignored to prevent tracking user-specific
session data.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
* fix: resolve critical bugs in streaming container mode
Bug 1 (scheduled task hang): Task scheduler now passes onOutput callback
with idle timer that writes _close sentinel after IDLE_TIMEOUT, so
containers exit cleanly instead of blocking queue slots for 30 minutes.
Scheduled tasks stay alive for interactive follow-up via IPC.
Bug 2 (timeout disabled): Remove resetTimeout() from stderr handler.
SDK writes debug logs continuously, resetting the timer on every line.
Timeout now only resets on actual output markers in stdout.
Bug 3 (trigger bypass): Piped messages in startMessageLoop now check
trigger pattern for non-main groups. Non-trigger messages accumulate in
DB and are pulled as context via getMessagesSince when a trigger arrives.
Bug 7 (non-atomic IPC writes): GroupQueue.sendMessage uses temp file +
rename for atomic writes, matching ipc-mcp-stdio.ts pattern.
Also: flip isVerbose back to false (debug leftover), add isScheduledTask
to host-side ContainerInput interface.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
* fix: idle timer not starting + scheduled task groupFolder missing
Two bugs that prevented the scheduled task idle timeout fix from working:
1. onOutput was only called when parsed.result !== null, but session
update markers have result: null. The idle timer never started for
"silent" query completions, leaving containers parked at
waitForIpcMessage until hard timeout.
2. Scheduler's onProcess callback didn't pass groupFolder to
queue.registerProcess, so closeStdin no-oped (groupFolder was null).
The _close sentinel was never written even when the idle timer fired.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
* fix: duplicate messages and timestamp rollback in piping path
Two bugs introduced by the trigger context accumulation change:
1. processGroupMessages didn't advance lastAgentTimestamp until after
the container finished. The piping path's getMessagesSince(lastAgent
Timestamp) re-fetched messages already sent as the initial prompt,
causing duplicates.
2. processGroupMessages overwrote lastAgentTimestamp with the original
batch timestamp on completion, rolling back any advancement made by
the piping path while the container was running.
Fix: advance lastAgentTimestamp immediately after building the prompt,
before starting the container. This matches the piping path behavior
and eliminates both the overlap and the rollback.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
* fix: container idles 30 extra minutes after _close during query
When _close was detected during pollIpcDuringQuery, it was consumed
(deleted) and stream.end() was called. But after runQuery returned,
main() still emitted a session-update marker (resetting the host's idle
timer) and called waitForIpcMessage (which polled forever since _close
was already gone). The container had to wait for a second _close.
Fix: runQuery now returns closedDuringQuery. When true, main() skips
the session-update marker and waitForIpcMessage, exiting immediately.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
* fix: resume branching, internal tags, and output forwarding
- Fix resume branching: pass resumeSessionAt with last assistant UUID
to anchor each query loop resume to the correct conversation tree
position. Prevents agent responses landing on invisible branches
when agent teams subagents create parallel JSONL entries.
- Add <internal> tag stripping: agent can wrap internal reasoning in
<internal> tags which are logged but not sent to WhatsApp. Prevents
duplicate messages and internal monologue reaching users.
- Forward scheduled task output: scheduled tasks now send result text
to WhatsApp (with <internal> stripping), matching regular message
behavior. No more special-case instructions.
- Update Communication guidance in CLAUDE.md: simplified to "your
output is sent to the user or group" with soft guidance on
<internal> tags and send_message usage.
- Add messaging behavior docs to schedule_task tool: prompts the
scheduling agent to include guidance on whether the task should
always/conditionally/never message the user.
- Mount security: containerPath now optional, defaults to basename
of hostPath.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
* fix: cursor rollback on error, flush guard, verbose logging
- Roll back lastAgentTimestamp on container error so retries can
re-process the messages instead of silently losing them.
- Add guard flag to flushOutgoingQueue to prevent duplicate sends
from concurrent flushes during rapid WA reconnects.
- Revert isVerbose from hardcoded false back to env-based check
(LOG_LEVEL=debug|trace).
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
* fix: orphan container cleanup was silently failing
The startup cleanup used `container ls --format {{.Names}}` which is
Docker Go-template syntax. Apple Container only supports `--format json`
or `--format table`. The command errored with exit code 64, but the
catch block silently swallowed it — orphan containers were never cleaned
up on restart.
Fixed to use `--format json` and parse `configuration.id` from the
JSON output. Also filters by `status: running` and logs a warning on
failure instead of silently catching.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
* docs: add Discord badge and community section
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
* fix: idle timer reset on null results and flush queue message loss
- Only reset idle timer on actual results (non-null), not session-update
markers. Prevents containers staying alive 30 extra minutes after the
agent finishes work.
- flushOutgoingQueue now uses shift() instead of splice(0) so unattempted
messages stay in the queue if an unexpected error bails the loop.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
* docs: add Agent Swarms to README
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
* fix: update Telegram skill for current architecture
Rewrite integration instructions to match the per-group queue/SQLite
architecture: remove onMessage callback pattern (store to DB, let
message loop pick up), fix startSchedulerLoop signature, add
TELEGRAM_ONLY service startup, SQLite registration, data/env/env sync,
@mention-to-trigger translation, and BotFather group privacy docs.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
* fix: Telegram skill message chunking, media placeholders, chat discovery
- Split long messages at Telegram's 4096 char limit to prevent silent
send failures
- Store placeholder text for non-text messages (photos, voice, stickers,
etc.) so the agent knows media was sent
- Update getAvailableGroups filter to include tg: chats so the agent can
discover and register Telegram chats via IPC
- Fix removal step numbering
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
* docs: update REQUIREMENTS.md and SPEC.md for SQLite architecture
- Replace all registered_groups.json / sessions.json / router_state.json
references with SQLite equivalents
- Fix CONTAINER_TIMEOUT default (300000 → 1800000)
- Add missing config exports (IDLE_TIMEOUT, MAX_CONCURRENT_CONTAINERS)
- Update folder structure: add missing src files (logger, group-queue,
mount-security), remove non-existent utils.ts, list all skills
- Fix agent-runner entry (ipc-mcp.ts → ipc-mcp-stdio.ts)
- Update startup sequence to reflect per-group queue architecture
- Fix env mounting description (data/env/env, not extracted vars)
- Update troubleshooting to use sqlite3 commands
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
* docs: fix README architecture description, revert SPEC.md env error
- README: update architecture blurb to mention per-group queue, add
group-queue.ts to key files, update file descriptions
- SPEC.md: restore correct credential filtering description (only auth
vars are extracted from .env, not the full file)
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
---------
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -23,7 +23,7 @@ export const MAIN_GROUP_FOLDER = 'main';
|
||||
export const CONTAINER_IMAGE =
|
||||
process.env.CONTAINER_IMAGE || 'nanoclaw-agent:latest';
|
||||
export const CONTAINER_TIMEOUT = parseInt(
|
||||
process.env.CONTAINER_TIMEOUT || '300000',
|
||||
process.env.CONTAINER_TIMEOUT || '1800000',
|
||||
10,
|
||||
);
|
||||
export const CONTAINER_MAX_OUTPUT_SIZE = parseInt(
|
||||
@@ -31,6 +31,10 @@ export const CONTAINER_MAX_OUTPUT_SIZE = parseInt(
|
||||
10,
|
||||
); // 10MB default
|
||||
export const IPC_POLL_INTERVAL = 1000;
|
||||
export const IDLE_TIMEOUT = parseInt(
|
||||
process.env.IDLE_TIMEOUT || '1800000',
|
||||
10,
|
||||
); // 30min default — how long to keep container alive after last result
|
||||
export const MAX_CONCURRENT_CONTAINERS = Math.max(
|
||||
1,
|
||||
parseInt(process.env.MAX_CONCURRENT_CONTAINERS || '5', 10) || 5,
|
||||
|
||||
@@ -38,17 +38,12 @@ export interface ContainerInput {
|
||||
groupFolder: string;
|
||||
chatJid: string;
|
||||
isMain: boolean;
|
||||
}
|
||||
|
||||
export interface AgentResponse {
|
||||
outputType: 'message' | 'log';
|
||||
userMessage?: string;
|
||||
internalLog?: string;
|
||||
isScheduledTask?: boolean;
|
||||
}
|
||||
|
||||
export interface ContainerOutput {
|
||||
status: 'success' | 'error';
|
||||
result: AgentResponse | null;
|
||||
result: string | null;
|
||||
newSessionId?: string;
|
||||
error?: string;
|
||||
}
|
||||
@@ -110,6 +105,29 @@ function buildVolumeMounts(
|
||||
'.claude',
|
||||
);
|
||||
fs.mkdirSync(groupSessionsDir, { recursive: true });
|
||||
const settingsFile = path.join(groupSessionsDir, 'settings.json');
|
||||
if (!fs.existsSync(settingsFile)) {
|
||||
fs.writeFileSync(settingsFile, JSON.stringify({
|
||||
env: { CLAUDE_CODE_EXPERIMENTAL_AGENT_TEAMS: '1' },
|
||||
}, null, 2) + '\n');
|
||||
}
|
||||
|
||||
// Sync skills from container/skills/ into each group's .claude/skills/
|
||||
const skillsSrc = path.join(process.cwd(), 'container', 'skills');
|
||||
const skillsDst = path.join(groupSessionsDir, 'skills');
|
||||
if (fs.existsSync(skillsSrc)) {
|
||||
for (const skillDir of fs.readdirSync(skillsSrc)) {
|
||||
const srcDir = path.join(skillsSrc, skillDir);
|
||||
if (!fs.statSync(srcDir).isDirectory()) continue;
|
||||
const dstDir = path.join(skillsDst, skillDir);
|
||||
fs.mkdirSync(dstDir, { recursive: true });
|
||||
for (const file of fs.readdirSync(srcDir)) {
|
||||
const srcFile = path.join(srcDir, file);
|
||||
const dstFile = path.join(dstDir, file);
|
||||
fs.copyFileSync(srcFile, dstFile);
|
||||
}
|
||||
}
|
||||
}
|
||||
mounts.push({
|
||||
hostPath: groupSessionsDir,
|
||||
containerPath: '/home/node/.claude',
|
||||
@@ -121,6 +139,7 @@ function buildVolumeMounts(
|
||||
const groupIpcDir = path.join(DATA_DIR, 'ipc', group.folder);
|
||||
fs.mkdirSync(path.join(groupIpcDir, 'messages'), { recursive: true });
|
||||
fs.mkdirSync(path.join(groupIpcDir, 'tasks'), { recursive: true });
|
||||
fs.mkdirSync(path.join(groupIpcDir, 'input'), { recursive: true });
|
||||
mounts.push({
|
||||
hostPath: groupIpcDir,
|
||||
containerPath: '/workspace/ipc',
|
||||
@@ -154,6 +173,15 @@ function buildVolumeMounts(
|
||||
}
|
||||
}
|
||||
|
||||
// Mount agent-runner source from host — recompiled on container startup.
|
||||
// Bypasses Apple Container's sticky build cache for code changes.
|
||||
const agentRunnerSrc = path.join(projectRoot, 'container', 'agent-runner', 'src');
|
||||
mounts.push({
|
||||
hostPath: agentRunnerSrc,
|
||||
containerPath: '/app/src',
|
||||
readonly: true,
|
||||
});
|
||||
|
||||
// Additional mounts validated against external allowlist (tamper-proof from containers)
|
||||
if (group.containerConfig?.additionalMounts) {
|
||||
const validatedMounts = validateAdditionalMounts(
|
||||
@@ -191,6 +219,7 @@ export async function runContainerAgent(
|
||||
group: RegisteredGroup,
|
||||
input: ContainerInput,
|
||||
onProcess: (proc: ChildProcess, containerName: string) => void,
|
||||
onOutput?: (output: ContainerOutput) => Promise<void>,
|
||||
): Promise<ContainerOutput> {
|
||||
const startTime = Date.now();
|
||||
|
||||
@@ -240,22 +269,63 @@ export async function runContainerAgent(
|
||||
let stdoutTruncated = false;
|
||||
let stderrTruncated = false;
|
||||
|
||||
// Write input and close stdin (Apple Container doesn't flush pipe without EOF)
|
||||
container.stdin.write(JSON.stringify(input));
|
||||
container.stdin.end();
|
||||
|
||||
// Streaming output: parse OUTPUT_START/END marker pairs as they arrive
|
||||
let parseBuffer = '';
|
||||
let newSessionId: string | undefined;
|
||||
let outputChain = Promise.resolve();
|
||||
|
||||
container.stdout.on('data', (data) => {
|
||||
if (stdoutTruncated) return;
|
||||
const chunk = data.toString();
|
||||
const remaining = CONTAINER_MAX_OUTPUT_SIZE - stdout.length;
|
||||
if (chunk.length > remaining) {
|
||||
stdout += chunk.slice(0, remaining);
|
||||
stdoutTruncated = true;
|
||||
logger.warn(
|
||||
{ group: group.name, size: stdout.length },
|
||||
'Container stdout truncated due to size limit',
|
||||
);
|
||||
} else {
|
||||
stdout += chunk;
|
||||
|
||||
// Always accumulate for logging
|
||||
if (!stdoutTruncated) {
|
||||
const remaining = CONTAINER_MAX_OUTPUT_SIZE - stdout.length;
|
||||
if (chunk.length > remaining) {
|
||||
stdout += chunk.slice(0, remaining);
|
||||
stdoutTruncated = true;
|
||||
logger.warn(
|
||||
{ group: group.name, size: stdout.length },
|
||||
'Container stdout truncated due to size limit',
|
||||
);
|
||||
} else {
|
||||
stdout += chunk;
|
||||
}
|
||||
}
|
||||
|
||||
// Stream-parse for output markers
|
||||
if (onOutput) {
|
||||
parseBuffer += chunk;
|
||||
let startIdx: number;
|
||||
while ((startIdx = parseBuffer.indexOf(OUTPUT_START_MARKER)) !== -1) {
|
||||
const endIdx = parseBuffer.indexOf(OUTPUT_END_MARKER, startIdx);
|
||||
if (endIdx === -1) break; // Incomplete pair, wait for more data
|
||||
|
||||
const jsonStr = parseBuffer
|
||||
.slice(startIdx + OUTPUT_START_MARKER.length, endIdx)
|
||||
.trim();
|
||||
parseBuffer = parseBuffer.slice(endIdx + OUTPUT_END_MARKER.length);
|
||||
|
||||
try {
|
||||
const parsed: ContainerOutput = JSON.parse(jsonStr);
|
||||
if (parsed.newSessionId) {
|
||||
newSessionId = parsed.newSessionId;
|
||||
}
|
||||
// Activity detected — reset the hard timeout
|
||||
resetTimeout();
|
||||
// Call onOutput for all markers (including null results)
|
||||
// so idle timers start even for "silent" query completions.
|
||||
outputChain = outputChain.then(() => onOutput(parsed));
|
||||
} catch (err) {
|
||||
logger.warn(
|
||||
{ group: group.name, error: err },
|
||||
'Failed to parse streamed output chunk',
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
@@ -265,6 +335,8 @@ export async function runContainerAgent(
|
||||
for (const line of lines) {
|
||||
if (line) logger.debug({ container: group.folder }, line);
|
||||
}
|
||||
// Don't reset timeout on stderr — SDK writes debug logs continuously.
|
||||
// Timeout only resets on actual output (OUTPUT_MARKER in stdout).
|
||||
if (stderrTruncated) return;
|
||||
const remaining = CONTAINER_MAX_OUTPUT_SIZE - stderr.length;
|
||||
if (chunk.length > remaining) {
|
||||
@@ -280,18 +352,26 @@ export async function runContainerAgent(
|
||||
});
|
||||
|
||||
let timedOut = false;
|
||||
const timeoutMs = group.containerConfig?.timeout || CONTAINER_TIMEOUT;
|
||||
|
||||
const timeout = setTimeout(() => {
|
||||
const killOnTimeout = () => {
|
||||
timedOut = true;
|
||||
logger.error({ group: group.name, containerName }, 'Container timeout, stopping gracefully');
|
||||
// Graceful stop: sends SIGTERM, waits, then SIGKILL — lets --rm fire
|
||||
exec(`container stop ${containerName}`, { timeout: 15000 }, (err) => {
|
||||
if (err) {
|
||||
logger.warn({ group: group.name, containerName, err }, 'Graceful stop failed, force killing');
|
||||
container.kill('SIGKILL');
|
||||
}
|
||||
});
|
||||
}, group.containerConfig?.timeout || CONTAINER_TIMEOUT);
|
||||
};
|
||||
|
||||
let timeout = setTimeout(killOnTimeout, timeoutMs);
|
||||
|
||||
// Reset the timeout whenever there's activity (streaming output)
|
||||
const resetTimeout = () => {
|
||||
clearTimeout(timeout);
|
||||
timeout = setTimeout(killOnTimeout, timeoutMs);
|
||||
};
|
||||
|
||||
container.on('close', (code) => {
|
||||
clearTimeout(timeout);
|
||||
@@ -324,8 +404,7 @@ export async function runContainerAgent(
|
||||
|
||||
const timestamp = new Date().toISOString().replace(/[:.]/g, '-');
|
||||
const logFile = path.join(logsDir, `container-${timestamp}.log`);
|
||||
const isVerbose =
|
||||
process.env.LOG_LEVEL === 'debug' || process.env.LOG_LEVEL === 'trace';
|
||||
const isVerbose = process.env.LOG_LEVEL === 'debug' || process.env.LOG_LEVEL === 'trace';
|
||||
|
||||
const logLines = [
|
||||
`=== Container Run Log ===`,
|
||||
@@ -401,6 +480,23 @@ export async function runContainerAgent(
|
||||
return;
|
||||
}
|
||||
|
||||
// Streaming mode: wait for output chain to settle, return completion marker
|
||||
if (onOutput) {
|
||||
outputChain.then(() => {
|
||||
logger.info(
|
||||
{ group: group.name, duration, newSessionId },
|
||||
'Container completed (streaming mode)',
|
||||
);
|
||||
resolve({
|
||||
status: 'success',
|
||||
result: null,
|
||||
newSessionId,
|
||||
});
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
// Legacy mode: parse the last output marker pair from accumulated stdout
|
||||
try {
|
||||
// Extract JSON between sentinel markers for robust parsing
|
||||
const startIdx = stdout.indexOf(OUTPUT_START_MARKER);
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
import { ChildProcess, exec } from 'child_process';
|
||||
import { ChildProcess } from 'child_process';
|
||||
import fs from 'fs';
|
||||
import path from 'path';
|
||||
|
||||
import { MAX_CONCURRENT_CONTAINERS } from './config.js';
|
||||
import { DATA_DIR, MAX_CONCURRENT_CONTAINERS } from './config.js';
|
||||
import { logger } from './logger.js';
|
||||
|
||||
interface QueuedTask {
|
||||
@@ -18,6 +20,7 @@ interface GroupState {
|
||||
pendingTasks: QueuedTask[];
|
||||
process: ChildProcess | null;
|
||||
containerName: string | null;
|
||||
groupFolder: string | null;
|
||||
retryCount: number;
|
||||
}
|
||||
|
||||
@@ -38,6 +41,7 @@ export class GroupQueue {
|
||||
pendingTasks: [],
|
||||
process: null,
|
||||
containerName: null,
|
||||
groupFolder: null,
|
||||
retryCount: 0,
|
||||
};
|
||||
this.groups.set(groupJid, state);
|
||||
@@ -108,10 +112,49 @@ export class GroupQueue {
|
||||
this.runTask(groupJid, { id: taskId, groupJid, fn });
|
||||
}
|
||||
|
||||
registerProcess(groupJid: string, proc: ChildProcess, containerName: string): void {
|
||||
registerProcess(groupJid: string, proc: ChildProcess, containerName: string, groupFolder?: string): void {
|
||||
const state = this.getGroup(groupJid);
|
||||
state.process = proc;
|
||||
state.containerName = containerName;
|
||||
if (groupFolder) state.groupFolder = groupFolder;
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a follow-up message to the active container via IPC file.
|
||||
* Returns true if the message was written, false if no active container.
|
||||
*/
|
||||
sendMessage(groupJid: string, text: string): boolean {
|
||||
const state = this.getGroup(groupJid);
|
||||
if (!state.active || !state.groupFolder) return false;
|
||||
|
||||
const inputDir = path.join(DATA_DIR, 'ipc', state.groupFolder, 'input');
|
||||
try {
|
||||
fs.mkdirSync(inputDir, { recursive: true });
|
||||
const filename = `${Date.now()}-${Math.random().toString(36).slice(2, 6)}.json`;
|
||||
const filepath = path.join(inputDir, filename);
|
||||
const tempPath = `${filepath}.tmp`;
|
||||
fs.writeFileSync(tempPath, JSON.stringify({ type: 'message', text }));
|
||||
fs.renameSync(tempPath, filepath);
|
||||
return true;
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Signal the active container to wind down by writing a close sentinel.
|
||||
*/
|
||||
closeStdin(groupJid: string): void {
|
||||
const state = this.getGroup(groupJid);
|
||||
if (!state.active || !state.groupFolder) return;
|
||||
|
||||
const inputDir = path.join(DATA_DIR, 'ipc', state.groupFolder, 'input');
|
||||
try {
|
||||
fs.mkdirSync(inputDir, { recursive: true });
|
||||
fs.writeFileSync(path.join(inputDir, '_close'), '');
|
||||
} catch {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
|
||||
private async runForGroup(
|
||||
@@ -144,6 +187,7 @@ export class GroupQueue {
|
||||
state.active = false;
|
||||
state.process = null;
|
||||
state.containerName = null;
|
||||
state.groupFolder = null;
|
||||
this.activeCount--;
|
||||
this.drainGroup(groupJid);
|
||||
}
|
||||
@@ -167,6 +211,7 @@ export class GroupQueue {
|
||||
state.active = false;
|
||||
state.process = null;
|
||||
state.containerName = null;
|
||||
state.groupFolder = null;
|
||||
this.activeCount--;
|
||||
this.drainGroup(groupJid);
|
||||
}
|
||||
@@ -236,65 +281,22 @@ export class GroupQueue {
|
||||
}
|
||||
}
|
||||
|
||||
async shutdown(gracePeriodMs: number): Promise<void> {
|
||||
async shutdown(_gracePeriodMs: number): Promise<void> {
|
||||
this.shuttingDown = true;
|
||||
logger.info(
|
||||
{ activeCount: this.activeCount, gracePeriodMs },
|
||||
'GroupQueue shutting down',
|
||||
);
|
||||
|
||||
// Collect all active processes
|
||||
const activeProcs: Array<{ jid: string; proc: ChildProcess; containerName: string | null }> = [];
|
||||
// Count active containers but don't kill them — they'll finish on their own
|
||||
// via idle timeout or container timeout. The --rm flag cleans them up on exit.
|
||||
// This prevents WhatsApp reconnection restarts from killing working agents.
|
||||
const activeContainers: string[] = [];
|
||||
for (const [jid, state] of this.groups) {
|
||||
if (state.process && !state.process.killed) {
|
||||
activeProcs.push({ jid, proc: state.process, containerName: state.containerName });
|
||||
if (state.process && !state.process.killed && state.containerName) {
|
||||
activeContainers.push(state.containerName);
|
||||
}
|
||||
}
|
||||
|
||||
if (activeProcs.length === 0) return;
|
||||
|
||||
// Stop all active containers gracefully
|
||||
for (const { jid, proc, containerName } of activeProcs) {
|
||||
if (containerName) {
|
||||
// Defense-in-depth: re-sanitize before shell interpolation.
|
||||
// Primary sanitization is in container-runner.ts when building the name,
|
||||
// but we sanitize again here since exec() runs through a shell.
|
||||
const safeName = containerName.replace(/[^a-zA-Z0-9-]/g, '');
|
||||
logger.info({ jid, containerName: safeName }, 'Stopping container');
|
||||
exec(`container stop ${safeName}`, (err) => {
|
||||
if (err) {
|
||||
logger.warn({ jid, containerName: safeName, err: err.message }, 'container stop failed');
|
||||
}
|
||||
});
|
||||
} else {
|
||||
logger.info({ jid, pid: proc.pid }, 'Sending SIGTERM to process');
|
||||
proc.kill('SIGTERM');
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for grace period
|
||||
await new Promise<void>((resolve) => {
|
||||
const checkInterval = setInterval(() => {
|
||||
const alive = activeProcs.filter(
|
||||
({ proc }) => !proc.killed && proc.exitCode === null,
|
||||
);
|
||||
if (alive.length === 0) {
|
||||
clearInterval(checkInterval);
|
||||
resolve();
|
||||
}
|
||||
}, 500);
|
||||
|
||||
setTimeout(() => {
|
||||
clearInterval(checkInterval);
|
||||
// SIGKILL survivors
|
||||
for (const { jid, proc } of activeProcs) {
|
||||
if (!proc.killed && proc.exitCode === null) {
|
||||
logger.warn({ jid, pid: proc.pid }, 'Sending SIGKILL to container');
|
||||
proc.kill('SIGKILL');
|
||||
}
|
||||
}
|
||||
resolve();
|
||||
}, gracePeriodMs);
|
||||
});
|
||||
logger.info(
|
||||
{ activeCount: this.activeCount, detachedContainers: activeContainers },
|
||||
'GroupQueue shutting down (containers detached, not killed)',
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
250
src/index.ts
250
src/index.ts
@@ -13,6 +13,7 @@ import { CronExpressionParser } from 'cron-parser';
|
||||
import {
|
||||
ASSISTANT_NAME,
|
||||
DATA_DIR,
|
||||
IDLE_TIMEOUT,
|
||||
IPC_POLL_INTERVAL,
|
||||
MAIN_GROUP_FOLDER,
|
||||
POLL_INTERVAL,
|
||||
@@ -21,8 +22,8 @@ import {
|
||||
TRIGGER_PATTERN,
|
||||
} from './config.js';
|
||||
import {
|
||||
AgentResponse,
|
||||
AvailableGroup,
|
||||
ContainerOutput,
|
||||
runContainerAgent,
|
||||
writeGroupsSnapshot,
|
||||
writeTasksSnapshot,
|
||||
@@ -51,7 +52,7 @@ import {
|
||||
} from './db.js';
|
||||
import { GroupQueue } from './group-queue.js';
|
||||
import { startSchedulerLoop } from './task-scheduler.js';
|
||||
import { RegisteredGroup } from './types.js';
|
||||
import { NewMessage, RegisteredGroup } from './types.js';
|
||||
import { logger } from './logger.js';
|
||||
|
||||
const GROUP_SYNC_INTERVAL_MS = 24 * 60 * 60 * 1000; // 24 hours
|
||||
@@ -67,6 +68,9 @@ let lidToPhoneMap: Record<string, string> = {};
|
||||
let messageLoopRunning = false;
|
||||
let ipcWatcherRunning = false;
|
||||
let groupSyncTimerStarted = false;
|
||||
// WhatsApp connection state and outgoing message queue
|
||||
let waConnected = false;
|
||||
const outgoingQueue: Array<{ jid: string; text: string }> = [];
|
||||
|
||||
const queue = new GroupQueue();
|
||||
|
||||
@@ -189,9 +193,28 @@ function getAvailableGroups(): AvailableGroup[] {
|
||||
}));
|
||||
}
|
||||
|
||||
function escapeXml(s: string): string {
|
||||
return s
|
||||
.replace(/&/g, '&')
|
||||
.replace(/</g, '<')
|
||||
.replace(/>/g, '>')
|
||||
.replace(/"/g, '"');
|
||||
}
|
||||
|
||||
function formatMessages(messages: NewMessage[]): string {
|
||||
const lines = messages.map((m) =>
|
||||
`<message sender="${escapeXml(m.sender_name)}" time="${m.timestamp}">${escapeXml(m.content)}</message>`,
|
||||
);
|
||||
return `<messages>\n${lines.join('\n')}\n</messages>`;
|
||||
}
|
||||
|
||||
/**
|
||||
* Process all pending messages for a group.
|
||||
* Called by the GroupQueue when it's this group's turn.
|
||||
*
|
||||
* Uses streaming output: agent results are sent to WhatsApp as they arrive.
|
||||
* The container stays alive for IDLE_TIMEOUT after each result, allowing
|
||||
* rapid-fire messages to be piped in without spawning a new container.
|
||||
*/
|
||||
async function processGroupMessages(chatJid: string): Promise<boolean> {
|
||||
const group = registeredGroups[chatJid];
|
||||
@@ -217,47 +240,64 @@ async function processGroupMessages(chatJid: string): Promise<boolean> {
|
||||
if (!hasTrigger) return true;
|
||||
}
|
||||
|
||||
const lines = missedMessages.map((m) => {
|
||||
const escapeXml = (s: string) =>
|
||||
s
|
||||
.replace(/&/g, '&')
|
||||
.replace(/</g, '<')
|
||||
.replace(/>/g, '>')
|
||||
.replace(/"/g, '"');
|
||||
return `<message sender="${escapeXml(m.sender_name)}" time="${m.timestamp}">${escapeXml(m.content)}</message>`;
|
||||
});
|
||||
const prompt = `<messages>\n${lines.join('\n')}\n</messages>`;
|
||||
const prompt = formatMessages(missedMessages);
|
||||
|
||||
// Advance cursor so the piping path in startMessageLoop won't re-fetch
|
||||
// these messages. Save the old cursor so we can roll back on error.
|
||||
const previousCursor = lastAgentTimestamp[chatJid] || '';
|
||||
lastAgentTimestamp[chatJid] =
|
||||
missedMessages[missedMessages.length - 1].timestamp;
|
||||
saveState();
|
||||
|
||||
logger.info(
|
||||
{ group: group.name, messageCount: missedMessages.length },
|
||||
'Processing messages',
|
||||
);
|
||||
|
||||
// Track idle timer for closing stdin when agent is idle
|
||||
let idleTimer: ReturnType<typeof setTimeout> | null = null;
|
||||
|
||||
const resetIdleTimer = () => {
|
||||
if (idleTimer) clearTimeout(idleTimer);
|
||||
idleTimer = setTimeout(() => {
|
||||
logger.debug({ group: group.name }, 'Idle timeout, closing container stdin');
|
||||
queue.closeStdin(chatJid);
|
||||
}, IDLE_TIMEOUT);
|
||||
};
|
||||
|
||||
await setTyping(chatJid, true);
|
||||
const response = await runAgent(group, prompt, chatJid);
|
||||
let hadError = false;
|
||||
|
||||
const output = await runAgent(group, prompt, chatJid, async (result) => {
|
||||
// Streaming output callback — called for each agent result
|
||||
if (result.result) {
|
||||
const raw = typeof result.result === 'string' ? result.result : JSON.stringify(result.result);
|
||||
// Strip <internal>...</internal> blocks — agent uses these for internal reasoning
|
||||
const text = raw.replace(/<internal>[\s\S]*?<\/internal>/g, '').trim();
|
||||
logger.info({ group: group.name }, `Agent output: ${raw.slice(0, 200)}`);
|
||||
if (text) {
|
||||
await sendMessage(chatJid, `${ASSISTANT_NAME}: ${text}`);
|
||||
}
|
||||
// Only reset idle timer on actual results, not session-update markers (result: null)
|
||||
resetIdleTimer();
|
||||
}
|
||||
|
||||
if (result.status === 'error') {
|
||||
hadError = true;
|
||||
}
|
||||
});
|
||||
|
||||
await setTyping(chatJid, false);
|
||||
if (idleTimer) clearTimeout(idleTimer);
|
||||
|
||||
if (response === 'error') {
|
||||
// Container or agent error — signal failure so queue can retry with backoff
|
||||
if (output === 'error' || hadError) {
|
||||
// Roll back cursor so retries can re-process these messages
|
||||
lastAgentTimestamp[chatJid] = previousCursor;
|
||||
saveState();
|
||||
logger.warn({ group: group.name }, 'Agent error, rolled back message cursor for retry');
|
||||
return false;
|
||||
}
|
||||
|
||||
// Agent processed messages successfully (whether it responded or stayed silent)
|
||||
lastAgentTimestamp[chatJid] =
|
||||
missedMessages[missedMessages.length - 1].timestamp;
|
||||
saveState();
|
||||
|
||||
if (response.outputType === 'message' && response.userMessage) {
|
||||
await sendMessage(chatJid, `${ASSISTANT_NAME}: ${response.userMessage}`);
|
||||
}
|
||||
|
||||
if (response.internalLog) {
|
||||
logger.info(
|
||||
{ group: group.name, outputType: response.outputType },
|
||||
`Agent: ${response.internalLog}`,
|
||||
);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -265,7 +305,8 @@ async function runAgent(
|
||||
group: RegisteredGroup,
|
||||
prompt: string,
|
||||
chatJid: string,
|
||||
): Promise<AgentResponse | 'error'> {
|
||||
onOutput?: (output: ContainerOutput) => Promise<void>,
|
||||
): Promise<'success' | 'error'> {
|
||||
const isMain = group.folder === MAIN_GROUP_FOLDER;
|
||||
const sessionId = sessions[group.folder];
|
||||
|
||||
@@ -294,6 +335,17 @@ async function runAgent(
|
||||
new Set(Object.keys(registeredGroups)),
|
||||
);
|
||||
|
||||
// Wrap onOutput to track session ID from streamed results
|
||||
const wrappedOnOutput = onOutput
|
||||
? async (output: ContainerOutput) => {
|
||||
if (output.newSessionId) {
|
||||
sessions[group.folder] = output.newSessionId;
|
||||
setSession(group.folder, output.newSessionId);
|
||||
}
|
||||
await onOutput(output);
|
||||
}
|
||||
: undefined;
|
||||
|
||||
try {
|
||||
const output = await runContainerAgent(
|
||||
group,
|
||||
@@ -304,7 +356,8 @@ async function runAgent(
|
||||
chatJid,
|
||||
isMain,
|
||||
},
|
||||
(proc, containerName) => queue.registerProcess(chatJid, proc, containerName),
|
||||
(proc, containerName) => queue.registerProcess(chatJid, proc, containerName, group.folder),
|
||||
wrappedOnOutput,
|
||||
);
|
||||
|
||||
if (output.newSessionId) {
|
||||
@@ -320,7 +373,7 @@ async function runAgent(
|
||||
return 'error';
|
||||
}
|
||||
|
||||
return output.result ?? { outputType: 'log' };
|
||||
return 'success';
|
||||
} catch (err) {
|
||||
logger.error({ group: group.name, err }, 'Agent error');
|
||||
return 'error';
|
||||
@@ -328,11 +381,36 @@ async function runAgent(
|
||||
}
|
||||
|
||||
async function sendMessage(jid: string, text: string): Promise<void> {
|
||||
if (!waConnected) {
|
||||
outgoingQueue.push({ jid, text });
|
||||
logger.info({ jid, length: text.length, queueSize: outgoingQueue.length }, 'WA disconnected, message queued');
|
||||
return;
|
||||
}
|
||||
try {
|
||||
await sock.sendMessage(jid, { text });
|
||||
logger.info({ jid, length: text.length }, 'Message sent');
|
||||
} catch (err) {
|
||||
logger.error({ jid, err }, 'Failed to send message');
|
||||
// If send fails, queue it for retry on reconnect
|
||||
outgoingQueue.push({ jid, text });
|
||||
logger.warn({ jid, err, queueSize: outgoingQueue.length }, 'Failed to send, message queued');
|
||||
}
|
||||
}
|
||||
|
||||
let flushing = false;
|
||||
async function flushOutgoingQueue(): Promise<void> {
|
||||
if (flushing || outgoingQueue.length === 0) return;
|
||||
flushing = true;
|
||||
try {
|
||||
logger.info({ count: outgoingQueue.length }, 'Flushing outgoing message queue');
|
||||
// Process one at a time — sendMessage re-queues on failure internally.
|
||||
// Shift instead of splice so unattempted messages stay in the queue
|
||||
// if an unexpected error occurs.
|
||||
while (outgoingQueue.length > 0) {
|
||||
const item = outgoingQueue.shift()!;
|
||||
await sendMessage(item.jid, item.text);
|
||||
}
|
||||
} finally {
|
||||
flushing = false;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -710,18 +788,27 @@ async function connectWhatsApp(): Promise<void> {
|
||||
}
|
||||
|
||||
if (connection === 'close') {
|
||||
waConnected = false;
|
||||
const reason = (lastDisconnect?.error as any)?.output?.statusCode;
|
||||
const shouldReconnect = reason !== DisconnectReason.loggedOut;
|
||||
logger.info({ reason, shouldReconnect }, 'Connection closed');
|
||||
logger.info({ reason, shouldReconnect, queuedMessages: outgoingQueue.length }, 'Connection closed');
|
||||
|
||||
if (shouldReconnect) {
|
||||
logger.info('Reconnecting...');
|
||||
connectWhatsApp();
|
||||
connectWhatsApp().catch((err) => {
|
||||
logger.error({ err }, 'Failed to reconnect, retrying in 5s');
|
||||
setTimeout(() => {
|
||||
connectWhatsApp().catch((err2) => {
|
||||
logger.error({ err: err2 }, 'Reconnection retry failed');
|
||||
});
|
||||
}, 5000);
|
||||
});
|
||||
} else {
|
||||
logger.info('Logged out. Run /setup to re-authenticate.');
|
||||
process.exit(0);
|
||||
}
|
||||
} else if (connection === 'open') {
|
||||
waConnected = true;
|
||||
logger.info('Connected to WhatsApp');
|
||||
|
||||
// Build LID to phone mapping from auth state for self-chat translation
|
||||
@@ -734,6 +821,11 @@ async function connectWhatsApp(): Promise<void> {
|
||||
}
|
||||
}
|
||||
|
||||
// Flush any messages queued while disconnected
|
||||
flushOutgoingQueue().catch((err) =>
|
||||
logger.error({ err }, 'Failed to flush outgoing queue'),
|
||||
);
|
||||
|
||||
// Sync group metadata on startup (respects 24h cache)
|
||||
syncGroupMetadata().catch((err) =>
|
||||
logger.error({ err }, 'Initial group sync failed'),
|
||||
@@ -748,11 +840,12 @@ async function connectWhatsApp(): Promise<void> {
|
||||
}, GROUP_SYNC_INTERVAL_MS);
|
||||
}
|
||||
startSchedulerLoop({
|
||||
sendMessage,
|
||||
registeredGroups: () => registeredGroups,
|
||||
getSessions: () => sessions,
|
||||
queue,
|
||||
onProcess: (groupJid, proc, containerName) => queue.registerProcess(groupJid, proc, containerName),
|
||||
onProcess: (groupJid, proc, containerName, groupFolder) => queue.registerProcess(groupJid, proc, containerName, groupFolder),
|
||||
sendMessage,
|
||||
assistantName: ASSISTANT_NAME,
|
||||
});
|
||||
startIpcWatcher();
|
||||
queue.setProcessMessagesFn(processGroupMessages);
|
||||
@@ -817,14 +910,57 @@ async function startMessageLoop(): Promise<void> {
|
||||
lastTimestamp = newTimestamp;
|
||||
saveState();
|
||||
|
||||
// Deduplicate by group and enqueue
|
||||
const groupsWithMessages = new Set<string>();
|
||||
// Deduplicate by group
|
||||
const messagesByGroup = new Map<string, NewMessage[]>();
|
||||
for (const msg of messages) {
|
||||
groupsWithMessages.add(msg.chat_jid);
|
||||
const existing = messagesByGroup.get(msg.chat_jid);
|
||||
if (existing) {
|
||||
existing.push(msg);
|
||||
} else {
|
||||
messagesByGroup.set(msg.chat_jid, [msg]);
|
||||
}
|
||||
}
|
||||
|
||||
for (const chatJid of groupsWithMessages) {
|
||||
queue.enqueueMessageCheck(chatJid);
|
||||
for (const [chatJid, groupMessages] of messagesByGroup) {
|
||||
const group = registeredGroups[chatJid];
|
||||
if (!group) continue;
|
||||
|
||||
const isMainGroup = group.folder === MAIN_GROUP_FOLDER;
|
||||
const needsTrigger = !isMainGroup && group.requiresTrigger !== false;
|
||||
|
||||
// For non-main groups, only act on trigger messages.
|
||||
// Non-trigger messages accumulate in DB and get pulled as
|
||||
// context when a trigger eventually arrives.
|
||||
if (needsTrigger) {
|
||||
const hasTrigger = groupMessages.some((m) =>
|
||||
TRIGGER_PATTERN.test(m.content.trim()),
|
||||
);
|
||||
if (!hasTrigger) continue;
|
||||
}
|
||||
|
||||
// Pull all messages since lastAgentTimestamp so non-trigger
|
||||
// context that accumulated between triggers is included.
|
||||
const allPending = getMessagesSince(
|
||||
chatJid,
|
||||
lastAgentTimestamp[chatJid] || '',
|
||||
ASSISTANT_NAME,
|
||||
);
|
||||
const messagesToSend =
|
||||
allPending.length > 0 ? allPending : groupMessages;
|
||||
const formatted = formatMessages(messagesToSend);
|
||||
|
||||
if (queue.sendMessage(chatJid, formatted)) {
|
||||
logger.debug(
|
||||
{ chatJid, count: messagesToSend.length },
|
||||
'Piped messages to active container',
|
||||
);
|
||||
lastAgentTimestamp[chatJid] =
|
||||
messagesToSend[messagesToSend.length - 1].timestamp;
|
||||
saveState();
|
||||
} else {
|
||||
// No active container — enqueue for a new one
|
||||
queue.enqueueMessageCheck(chatJid);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
@@ -891,22 +1027,26 @@ function ensureContainerSystemRunning(): void {
|
||||
}
|
||||
}
|
||||
|
||||
// Clean up stopped NanoClaw containers from previous runs
|
||||
// Kill and clean up orphaned NanoClaw containers from previous runs
|
||||
try {
|
||||
const output = execSync('container ls -a --format {{.Names}}', {
|
||||
const output = execSync('container ls --format json', {
|
||||
stdio: ['pipe', 'pipe', 'pipe'],
|
||||
encoding: 'utf-8',
|
||||
});
|
||||
const stale = output
|
||||
.split('\n')
|
||||
.map((n) => n.trim())
|
||||
.filter((n) => n.startsWith('nanoclaw-'));
|
||||
if (stale.length > 0) {
|
||||
execSync(`container rm ${stale.join(' ')}`, { stdio: 'pipe' });
|
||||
logger.info({ count: stale.length }, 'Cleaned up stopped containers');
|
||||
const containers: { status: string; configuration: { id: string } }[] = JSON.parse(output || '[]');
|
||||
const orphans = containers
|
||||
.filter((c) => c.status === 'running' && c.configuration.id.startsWith('nanoclaw-'))
|
||||
.map((c) => c.configuration.id);
|
||||
for (const name of orphans) {
|
||||
try {
|
||||
execSync(`container stop ${name}`, { stdio: 'pipe' });
|
||||
} catch { /* already stopped */ }
|
||||
}
|
||||
} catch {
|
||||
// No stopped containers or ls/rm not supported
|
||||
if (orphans.length > 0) {
|
||||
logger.info({ count: orphans.length, names: orphans }, 'Stopped orphaned containers');
|
||||
}
|
||||
} catch (err) {
|
||||
logger.warn({ err }, 'Failed to clean up orphaned containers');
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -4,3 +4,13 @@ export const logger = pino({
|
||||
level: process.env.LOG_LEVEL || 'info',
|
||||
transport: { target: 'pino-pretty', options: { colorize: true } },
|
||||
});
|
||||
|
||||
// Route uncaught errors through pino so they get timestamps in stderr
|
||||
process.on('uncaughtException', (err) => {
|
||||
logger.fatal({ err }, 'Uncaught exception');
|
||||
process.exit(1);
|
||||
});
|
||||
|
||||
process.on('unhandledRejection', (reason) => {
|
||||
logger.error({ err: reason }, 'Unhandled rejection');
|
||||
});
|
||||
|
||||
@@ -221,6 +221,7 @@ export interface MountValidationResult {
|
||||
allowed: boolean;
|
||||
reason: string;
|
||||
realHostPath?: string;
|
||||
resolvedContainerPath?: string;
|
||||
effectiveReadonly?: boolean;
|
||||
}
|
||||
|
||||
@@ -242,11 +243,14 @@ export function validateMount(
|
||||
};
|
||||
}
|
||||
|
||||
// Validate container path first (cheap check)
|
||||
if (!isValidContainerPath(mount.containerPath)) {
|
||||
// Derive containerPath from hostPath basename if not specified
|
||||
const containerPath = mount.containerPath || path.basename(mount.hostPath);
|
||||
|
||||
// Validate container path (cheap check)
|
||||
if (!isValidContainerPath(containerPath)) {
|
||||
return {
|
||||
allowed: false,
|
||||
reason: `Invalid container path: "${mount.containerPath}" - must be relative, non-empty, and not contain ".."`,
|
||||
reason: `Invalid container path: "${containerPath}" - must be relative, non-empty, and not contain ".."`,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -318,6 +322,7 @@ export function validateMount(
|
||||
allowed: true,
|
||||
reason: `Allowed under root "${allowedRoot.path}"${allowedRoot.description ? ` (${allowedRoot.description})` : ''}`,
|
||||
realHostPath: realPath,
|
||||
resolvedContainerPath: containerPath,
|
||||
effectiveReadonly,
|
||||
};
|
||||
}
|
||||
@@ -348,7 +353,7 @@ export function validateAdditionalMounts(
|
||||
if (result.allowed) {
|
||||
validatedMounts.push({
|
||||
hostPath: result.realHostPath!,
|
||||
containerPath: `/workspace/extra/${mount.containerPath}`,
|
||||
containerPath: `/workspace/extra/${result.resolvedContainerPath}`,
|
||||
readonly: result.effectiveReadonly!,
|
||||
});
|
||||
|
||||
@@ -356,7 +361,7 @@ export function validateAdditionalMounts(
|
||||
{
|
||||
group: groupName,
|
||||
hostPath: result.realHostPath,
|
||||
containerPath: mount.containerPath,
|
||||
containerPath: result.resolvedContainerPath,
|
||||
readonly: result.effectiveReadonly,
|
||||
reason: result.reason,
|
||||
},
|
||||
|
||||
@@ -4,13 +4,13 @@ import fs from 'fs';
|
||||
import path from 'path';
|
||||
|
||||
import {
|
||||
ASSISTANT_NAME,
|
||||
GROUPS_DIR,
|
||||
IDLE_TIMEOUT,
|
||||
MAIN_GROUP_FOLDER,
|
||||
SCHEDULER_POLL_INTERVAL,
|
||||
TIMEZONE,
|
||||
} from './config.js';
|
||||
import { runContainerAgent, writeTasksSnapshot } from './container-runner.js';
|
||||
import { ContainerOutput, runContainerAgent, writeTasksSnapshot } from './container-runner.js';
|
||||
import {
|
||||
getAllTasks,
|
||||
getDueTasks,
|
||||
@@ -23,11 +23,12 @@ import { logger } from './logger.js';
|
||||
import { RegisteredGroup, ScheduledTask } from './types.js';
|
||||
|
||||
export interface SchedulerDependencies {
|
||||
sendMessage: (jid: string, text: string) => Promise<void>;
|
||||
registeredGroups: () => Record<string, RegisteredGroup>;
|
||||
getSessions: () => Record<string, string>;
|
||||
queue: GroupQueue;
|
||||
onProcess: (groupJid: string, proc: ChildProcess, containerName: string) => void;
|
||||
onProcess: (groupJid: string, proc: ChildProcess, containerName: string, groupFolder: string) => void;
|
||||
sendMessage: (jid: string, text: string) => Promise<void>;
|
||||
assistantName: string;
|
||||
}
|
||||
|
||||
async function runTask(
|
||||
@@ -89,6 +90,18 @@ async function runTask(
|
||||
const sessionId =
|
||||
task.context_mode === 'group' ? sessions[task.group_folder] : undefined;
|
||||
|
||||
// Idle timer: writes _close sentinel after IDLE_TIMEOUT of no output,
|
||||
// so the container exits instead of hanging at waitForIpcMessage forever.
|
||||
let idleTimer: ReturnType<typeof setTimeout> | null = null;
|
||||
|
||||
const resetIdleTimer = () => {
|
||||
if (idleTimer) clearTimeout(idleTimer);
|
||||
idleTimer = setTimeout(() => {
|
||||
logger.debug({ taskId: task.id }, 'Scheduled task idle timeout, closing container stdin');
|
||||
deps.queue.closeStdin(task.chat_jid);
|
||||
}, IDLE_TIMEOUT);
|
||||
};
|
||||
|
||||
try {
|
||||
const output = await runContainerAgent(
|
||||
group,
|
||||
@@ -98,17 +111,33 @@ async function runTask(
|
||||
groupFolder: task.group_folder,
|
||||
chatJid: task.chat_jid,
|
||||
isMain,
|
||||
isScheduledTask: true,
|
||||
},
|
||||
(proc, containerName) => deps.onProcess(task.chat_jid, proc, containerName, task.group_folder),
|
||||
async (streamedOutput: ContainerOutput) => {
|
||||
if (streamedOutput.result) {
|
||||
result = streamedOutput.result;
|
||||
// Forward result to user (strip <internal> tags)
|
||||
const text = streamedOutput.result.replace(/<internal>[\s\S]*?<\/internal>/g, '').trim();
|
||||
if (text) {
|
||||
await deps.sendMessage(task.chat_jid, `${deps.assistantName}: ${text}`);
|
||||
}
|
||||
// Only reset idle timer on actual results, not session-update markers
|
||||
resetIdleTimer();
|
||||
}
|
||||
if (streamedOutput.status === 'error') {
|
||||
error = streamedOutput.error || 'Unknown error';
|
||||
}
|
||||
},
|
||||
(proc, containerName) => deps.onProcess(task.chat_jid, proc, containerName),
|
||||
);
|
||||
|
||||
if (idleTimer) clearTimeout(idleTimer);
|
||||
|
||||
if (output.status === 'error') {
|
||||
error = output.error || 'Unknown error';
|
||||
} else if (output.result) {
|
||||
if (output.result.outputType === 'message' && output.result.userMessage) {
|
||||
await deps.sendMessage(task.chat_jid, `${ASSISTANT_NAME}: ${output.result.userMessage}`);
|
||||
}
|
||||
result = output.result.userMessage || output.result.internalLog || null;
|
||||
// Messages are sent via MCP tool (IPC), result text is just logged
|
||||
result = output.result;
|
||||
}
|
||||
|
||||
logger.info(
|
||||
@@ -116,6 +145,7 @@ async function runTask(
|
||||
'Task completed',
|
||||
);
|
||||
} catch (err) {
|
||||
if (idleTimer) clearTimeout(idleTimer);
|
||||
error = err instanceof Error ? err.message : String(err);
|
||||
logger.error({ taskId: task.id, error }, 'Task failed');
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
export interface AdditionalMount {
|
||||
hostPath: string; // Absolute path on host (supports ~ for home)
|
||||
containerPath: string; // Path inside container (under /workspace/extra/)
|
||||
containerPath?: string; // Optional — defaults to basename of hostPath. Mounted at /workspace/extra/{value}
|
||||
readonly?: boolean; // Default: true for safety
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user