Skip to content

Conversation

@yoelk
Copy link

@yoelk yoelk commented Jan 28, 2026

Summary

Fixes data loss in the Kafka source when acknowledgements are enabled and message delivery fails.

Problem

When a message delivery fails (e.g., due to authentication errors) with acknowledgements enabled, the Kafka source would skip the failed message instead of retrying it, causing data loss.

The issue occurred because:

  • Offsets were correctly NOT committed for rejected messages (preventing duplicate processing)
  • But the consumer didn't seek back to retry them (causing data loss)

Solution

This PR ensures rejected messages are retried by:

  • Initializing last_committed_offset on the first message to enable retry even for the first message
  • Tracking acknowledgement status and setting a seek-back flag on rejection
  • Preventing new message consumption while seeking back to retry
  • Using a dedicated seek_to_retry_offset() method with proper error handling
  • Using configured timeouts (socket_timeout_ms for seek operations, fetch_wait_max_ms for retry delay)

Changes

  • Add last_committed_offset tracking to remember last successful commit
  • Add need_seek_back flag to trigger retry mechanism
  • Enhance acknowledgement handling to differentiate Delivered vs Errored/Rejected
  • Add guard to prevent consuming new messages during seek-back
  • Initialize offset tracking on first message
  • Add seek_to_retry_offset() method for clean separation of concerns
  • Add integration test seeks_back_on_rejected_message to validate the fix

Testing

  • ✅ All existing Kafka unit tests pass (5/5)
  • ✅ New integration test seeks_back_on_rejected_message validates the retry mechanism
  • ✅ Clean build with no warnings
  • ✅ Clippy passes with no warnings
  • ✅ Manually tested with reproduction scenario (authentication failures)

The integration test verifies:

  • Messages are retried after rejection
  • All messages are eventually received
  • Offsets are committed correctly after retry

Checklist

  • Code follows Vector's style guidelines
  • Tests added to validate the fix
  • Documentation updated (inline comments)
  • No breaking changes
  • Conventional commit format used

Fixes #24543

…ledgements

When acknowledgements are enabled and a message delivery fails (e.g., due to
authentication errors), the Kafka source now properly seeks back to retry the
failed message instead of skipping it.

Previously, when a message was rejected:
- The offset was correctly NOT committed (preventing duplicate processing)
- But the consumer didn't seek back to retry the message (causing data loss)

This fix ensures rejected messages are retried by:
- Initializing last_committed_offset on first message to enable retry
- Tracking acknowledgement status and setting a seek-back flag on rejection
- Preventing new message consumption while seeking back
- Using a dedicated seek_to_retry_offset() method with proper error handling
- Using configured timeouts (socket_timeout_ms for seek, fetch_wait_max_ms for retry delay)

Changes:
- Add last_committed_offset tracking to remember last successful commit
- Add need_seek_back flag to trigger retry mechanism
- Enhance acknowledgement handling to differentiate Delivered vs Errored/Rejected
- Add guard to prevent consuming new messages during seek-back
- Initialize offset tracking on first message
- Add seek_to_retry_offset() method for clean separation of concerns
- Add integration test seeks_back_on_rejected_message to validate the fix

The integration test verifies:
- Messages are retried after rejection
- All messages are eventually received
- Offsets are committed correctly after retry
@yoelk yoelk requested a review from a team as a code owner January 28, 2026 19:26
@github-actions github-actions bot added the domain: sources Anything related to the Vector's sources label Jan 28, 2026
@yoelk
Copy link
Author

yoelk commented Jan 28, 2026

I have read the CLA Document and I hereby sign the CLA

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

domain: sources Anything related to the Vector's sources

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[BUG] Kafka source loses messages when sink returns 401/403 with acknowledgements enabled

1 participant