Skip to content

feat(aws_s3): add sqs.message_timeout visibility heartbeat#4469

Open
peczenyj wants to merge 5 commits into
redpanda-data:mainfrom
peczenyj:s3-input-sqs-message-timeout-heartbeat
Open

feat(aws_s3): add sqs.message_timeout visibility heartbeat#4469
peczenyj wants to merge 5 commits into
redpanda-data:mainfrom
peczenyj:s3-input-sqs-message-timeout-heartbeat

Conversation

@peczenyj

Copy link
Copy Markdown
Contributor

The aws_s3 input (SQS-driven mode) had no way to extend the visibility timeout of an in-flight notification while its S3 object was being processed. If download + scan + downstream delivery exceeded the queue's visibility timeout, SQS redelivered the notification and the whole object was reprocessed, producing duplicate records. This is especially easy to hit when the output batches on a time period, since even a small object can take the full batch period to ack.

Add an sqs.message_timeout field, mirroring the aws_sqs input: when greater than zero it sets the visibility timeout on each received notification and a background loop refreshes it (once half the timeout has elapsed) via ChangeMessageVisibility until every record scanned from the object has been acked or nacked. Because one notification fans out to N records with the SQS delete deferred until all resolve, tracking is keyed to the notification's receipt handle and removed in the existing shared ack/nack closures, after which the handle is invalid.

Defaults to 0s (disabled), preserving existing behaviour. Also wires awsS3Reader.Close through to the target reader's Close so the refresh goroutine is stopped and pending notifications are released on shutdown.

Closes #4468

peczenyj and others added 2 commits May 29, 2026 20:48
The aws_s3 input (SQS-driven mode) had no way to extend the visibility
timeout of an in-flight notification while its S3 object was being
processed. If download + scan + downstream delivery exceeded the queue's
visibility timeout, SQS redelivered the notification and the whole object
was reprocessed, producing duplicate records. This is especially easy to
hit when the output batches on a time period, since even a small object
can take the full batch period to ack.

Add an sqs.message_timeout field, mirroring the aws_sqs input: when
greater than zero it sets the visibility timeout on each received
notification and a background loop refreshes it (once half the timeout
has elapsed) via ChangeMessageVisibility until every record scanned from
the object has been acked or nacked. Because one notification fans out to
N records with the SQS delete deferred until all resolve, tracking is
keyed to the notification's receipt handle and removed in the existing
shared ack/nack closures, after which the handle is invalid.

Defaults to 0s (disabled), preserving existing behaviour. Also wires
awsS3Reader.Close through to the target reader's Close so the refresh
goroutine is stopped and pending notifications are released on shutdown.

Refs redpanda-data#4468

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@peczenyj peczenyj changed the title aws_s3: add sqs.message_timeout visibility heartbeat feat(aws_s3): add sqs.message_timeout visibility heartbeat Jun 1, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

aws_s3 input: support SQS visibility-timeout heartbeat (message_timeout) to avoid reprocessing slow/large objects

1 participant