diff --git a/src/index.ts b/src/index.ts index 5be2ebd..ffb01cb 100644 --- a/src/index.ts +++ b/src/index.ts @@ -522,12 +522,14 @@ async function startMessageLoop(): Promise { for (const msg of messages) { try { await processMessage(msg); + // Only advance timestamp after successful processing for at-least-once delivery + lastTimestamp = msg.timestamp; + saveState(); } catch (err) { - logger.error({ err, msg: msg.id }, 'Error processing message'); + logger.error({ err, msg: msg.id }, 'Error processing message, will retry'); + // Stop processing this batch - failed message will be retried next loop + break; } - // Advance timestamp after each message to avoid reprocessing on retry - lastTimestamp = msg.timestamp; - saveState(); } } catch (err) { logger.error({ err }, 'Error in message loop');