feat(aws_s3): add sqs.message_timeout visibility heartbeat#4469
Open
peczenyj wants to merge 5 commits into
Open
feat(aws_s3): add sqs.message_timeout visibility heartbeat#4469peczenyj wants to merge 5 commits into
peczenyj wants to merge 5 commits into
Conversation
The aws_s3 input (SQS-driven mode) had no way to extend the visibility timeout of an in-flight notification while its S3 object was being processed. If download + scan + downstream delivery exceeded the queue's visibility timeout, SQS redelivered the notification and the whole object was reprocessed, producing duplicate records. This is especially easy to hit when the output batches on a time period, since even a small object can take the full batch period to ack. Add an sqs.message_timeout field, mirroring the aws_sqs input: when greater than zero it sets the visibility timeout on each received notification and a background loop refreshes it (once half the timeout has elapsed) via ChangeMessageVisibility until every record scanned from the object has been acked or nacked. Because one notification fans out to N records with the SQS delete deferred until all resolve, tracking is keyed to the notification's receipt handle and removed in the existing shared ack/nack closures, after which the handle is invalid. Defaults to 0s (disabled), preserving existing behaviour. Also wires awsS3Reader.Close through to the target reader's Close so the refresh goroutine is stopped and pending notifications are released on shutdown. Refs redpanda-data#4468 Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
The aws_s3 input (SQS-driven mode) had no way to extend the visibility timeout of an in-flight notification while its S3 object was being processed. If download + scan + downstream delivery exceeded the queue's visibility timeout, SQS redelivered the notification and the whole object was reprocessed, producing duplicate records. This is especially easy to hit when the output batches on a time period, since even a small object can take the full batch period to ack.
Add an sqs.message_timeout field, mirroring the aws_sqs input: when greater than zero it sets the visibility timeout on each received notification and a background loop refreshes it (once half the timeout has elapsed) via ChangeMessageVisibility until every record scanned from the object has been acked or nacked. Because one notification fans out to N records with the SQS delete deferred until all resolve, tracking is keyed to the notification's receipt handle and removed in the existing shared ack/nack closures, after which the handle is invalid.
Defaults to 0s (disabled), preserving existing behaviour. Also wires awsS3Reader.Close through to the target reader's Close so the refresh goroutine is stopped and pending notifications are released on shutdown.
Closes #4468