diff --git a/container/agent-runner/src/index.ts b/container/agent-runner/src/index.ts index f05a6f4..8c377c8 100644 --- a/container/agent-runner/src/index.ts +++ b/container/agent-runner/src/index.ts @@ -17,9 +17,35 @@ interface ContainerInput { isScheduledTask?: boolean; } +interface AgentResponse { + status: 'responded' | 'silent'; + userMessage?: string; + internalLog?: string; +} + +const AGENT_RESPONSE_SCHEMA = { + type: 'object', + properties: { + status: { + type: 'string', + enum: ['responded', 'silent'], + description: 'Use "responded" when you have a message for the user. Use "silent" when the messages don\'t require a response (e.g. the conversation is between other people and doesn\'t involve you, or no trigger/mention was directed at you).', + }, + userMessage: { + type: 'string', + description: 'The message to send to the user. Required when status is "responded".', + }, + internalLog: { + type: 'string', + description: 'Optional internal note about why you chose this status (for logging, not shown to users).', + }, + }, + required: ['status'], +} as const; + interface ContainerOutput { status: 'success' | 'error'; - result: string | null; + result: AgentResponse | null; newSessionId?: string; error?: string; } @@ -222,7 +248,7 @@ async function main(): Promise { isMain: input.isMain }); - let result: string | null = null; + let result: AgentResponse | null = null; let newSessionId: string | undefined; // Add context for scheduled tasks @@ -253,6 +279,10 @@ async function main(): Promise { }, hooks: { PreCompact: [{ hooks: [createPreCompactHook()] }] + }, + outputFormat: { + type: 'json_schema', + schema: AGENT_RESPONSE_SCHEMA, } } })) { @@ -261,15 +291,25 @@ async function main(): Promise { log(`Session initialized: ${newSessionId}`); } - if ('result' in message && message.result) { - result = message.result as string; + if (message.type === 'result') { + if (message.subtype === 'success' && message.structured_output) { + result = message.structured_output as AgentResponse; + log(`Agent result: status=${result.status}${result.internalLog ? `, log=${result.internalLog}` : ''}`); + } else if (message.subtype === 'error_max_structured_output_retries') { + // Agent couldn't produce valid structured output — fall back to text result + log('Agent failed to produce structured output, falling back to text'); + const textResult = 'result' in message ? (message as { result?: string }).result : null; + if (textResult) { + result = { status: 'responded', userMessage: textResult }; + } + } } } log('Agent completed successfully'); writeOutput({ status: 'success', - result, + result: result ?? { status: 'silent' }, newSessionId }); diff --git a/src/container-runner.ts b/src/container-runner.ts index 224afbe..19a4e28 100644 --- a/src/container-runner.ts +++ b/src/container-runner.ts @@ -40,9 +40,15 @@ export interface ContainerInput { isMain: boolean; } +export interface AgentResponse { + status: 'responded' | 'silent'; + userMessage?: string; + internalLog?: string; +} + export interface ContainerOutput { status: 'success' | 'error'; - result: string | null; + result: AgentResponse | null; newSessionId?: string; error?: string; } diff --git a/src/group-queue.ts b/src/group-queue.ts index 3e44468..25cb621 100644 --- a/src/group-queue.ts +++ b/src/group-queue.ts @@ -9,12 +9,16 @@ interface QueuedTask { fn: () => Promise; } +const MAX_RETRIES = 5; +const BASE_RETRY_MS = 5000; + interface GroupState { active: boolean; pendingMessages: boolean; pendingTasks: QueuedTask[]; process: ChildProcess | null; containerName: string | null; + retryCount: number; } export class GroupQueue { @@ -34,6 +38,7 @@ export class GroupQueue { pendingTasks: [], process: null, containerName: null, + retryCount: 0, }; this.groups.set(groupJid, state); } @@ -126,22 +131,15 @@ export class GroupQueue { try { if (this.processMessagesFn) { const success = await this.processMessagesFn(groupJid); - if (!success) { - logger.info({ groupJid }, 'Processing failed, scheduling retry'); - setTimeout(() => { - if (!this.shuttingDown) { - this.enqueueMessageCheck(groupJid); - } - }, 5000); + if (success) { + state.retryCount = 0; + } else { + this.scheduleRetry(groupJid, state); } } } catch (err) { logger.error({ groupJid, err }, 'Error processing messages for group'); - setTimeout(() => { - if (!this.shuttingDown) { - this.enqueueMessageCheck(groupJid); - } - }, 5000); + this.scheduleRetry(groupJid, state); } finally { state.active = false; state.process = null; @@ -174,6 +172,29 @@ export class GroupQueue { } } + private scheduleRetry(groupJid: string, state: GroupState): void { + state.retryCount++; + if (state.retryCount > MAX_RETRIES) { + logger.error( + { groupJid, retryCount: state.retryCount }, + 'Max retries exceeded, dropping messages (will retry on next incoming message)', + ); + state.retryCount = 0; + return; + } + + const delayMs = BASE_RETRY_MS * Math.pow(2, state.retryCount - 1); + logger.info( + { groupJid, retryCount: state.retryCount, delayMs }, + 'Scheduling retry with backoff', + ); + setTimeout(() => { + if (!this.shuttingDown) { + this.enqueueMessageCheck(groupJid); + } + }, delayMs); + } + private drainGroup(groupJid: string): void { if (this.shuttingDown) return; diff --git a/src/index.ts b/src/index.ts index 1a13ec3..47158a0 100644 --- a/src/index.ts +++ b/src/index.ts @@ -21,6 +21,7 @@ import { TRIGGER_PATTERN, } from './config.js'; import { + AgentResponse, AvailableGroup, runContainerAgent, writeGroupsSnapshot, @@ -236,22 +237,35 @@ async function processGroupMessages(chatJid: string): Promise { const response = await runAgent(group, prompt, chatJid); await setTyping(chatJid, false); - if (response) { - // Fix batching bug: advance to latest message in batch, not just the trigger - lastAgentTimestamp[chatJid] = - missedMessages[missedMessages.length - 1].timestamp; - saveState(); - await sendMessage(chatJid, `${ASSISTANT_NAME}: ${response}`); - return true; + if (response === 'error') { + // Container or agent error — signal failure so queue can retry with backoff + return false; } - return false; + + // Agent processed messages successfully (whether it responded or stayed silent) + lastAgentTimestamp[chatJid] = + missedMessages[missedMessages.length - 1].timestamp; + saveState(); + + if (response.status === 'responded' && response.userMessage) { + await sendMessage(chatJid, `${ASSISTANT_NAME}: ${response.userMessage}`); + } + + if (response.internalLog) { + logger.info( + { group: group.name, agentStatus: response.status }, + `Agent: ${response.internalLog}`, + ); + } + + return true; } async function runAgent( group: RegisteredGroup, prompt: string, chatJid: string, -): Promise { +): Promise { const isMain = group.folder === MAIN_GROUP_FOLDER; const sessionId = sessions[group.folder]; @@ -303,13 +317,13 @@ async function runAgent( { group: group.name, error: output.error }, 'Container agent error', ); - return null; + return 'error'; } - return output.result; + return output.result ?? { status: 'silent' }; } catch (err) { logger.error({ group: group.name, err }, 'Agent error'); - return null; + return 'error'; } } @@ -740,8 +754,9 @@ async function connectWhatsApp(): Promise { onProcess: (groupJid, proc, containerName) => queue.registerProcess(groupJid, proc, containerName), }); startIpcWatcher(); - startMessageLoop(); + queue.setProcessMessagesFn(processGroupMessages); recoverPendingMessages(); + startMessageLoop(); } }); @@ -783,9 +798,6 @@ async function startMessageLoop(): Promise { } messageLoopRunning = true; - // Wire up the queue's message processing function - queue.setProcessMessagesFn(processGroupMessages); - logger.info(`NanoClaw running (trigger: @${ASSISTANT_NAME})`); while (true) { diff --git a/src/task-scheduler.ts b/src/task-scheduler.ts index 3508371..4dc5750 100644 --- a/src/task-scheduler.ts +++ b/src/task-scheduler.ts @@ -103,8 +103,8 @@ async function runTask( if (output.status === 'error') { error = output.error || 'Unknown error'; - } else { - result = output.result; + } else if (output.result) { + result = output.result.userMessage || output.result.internalLog || null; } logger.info(