[FLINK-39875][table-gateway] Restore thread interrupt status in ResultStore#28333
Open
jubins wants to merge 1 commit into
Open
[FLINK-39875][table-gateway] Restore thread interrupt status in ResultStore#28333jubins wants to merge 1 commit into
jubins wants to merge 1 commit into
Conversation
Collaborator
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What is the purpose of the change
Fixes FLINK-39875 — The
processRecord()method inResultStorecatchesInterruptedExceptionbut does not restore the thread's interrupt status, which violates Java concurrency best practices and could prevent proper shutdown or cancellation of the result retrieval thread.When a thread's
wait()is interrupted, the interrupt status is cleared. According to Java concurrency best practices, when catchingInterruptedException, code should either propagate the exception or restore the interrupt status by callingThread.currentThread().interrupt(). Failing to do so can cause theResultRetrievalThreadto not respond properly to shutdown signals sent by theclose()method.This PR restores the interrupt status and ensures the thread exits promptly when interrupted.
Brief change log
Thread.currentThread().interrupt()inprocessRecord()catch block to restore interrupt statusreturnto skip record processing when interruptedResultRetrievalThreadloop condition to check!Thread.interrupted()ResultStoreTestwith 4 comprehensive test cases:testInterruptHandling()— Verifies proper interrupt handling during executiontestProcessRecordWithFullBuffer()— Tests buffer management with multiple recordstestInterruptDuringBufferWait()— Validates interrupt response when waiting for buffer spacetestGracefulShutdown()— Ensures quick and clean shutdownVerifying this change
This change is covered by 4 new unit tests in
ResultStoreTest:testInterruptHandling()— Verifies that when the ResultStore is closed, the retrieval thread properly handles the interrupt and stops runningtestProcessRecordWithFullBuffer()— Tests that records are processed correctly when the buffer fills up and waits are requiredtestInterruptDuringBufferWait()— Validates that interrupts are handled promptly even when the thread is waiting for buffer spacetestGracefulShutdown()— Ensures that closing the ResultStore completes quickly (under 1 second), demonstrating proper interrupt handlingAll tests use timeout guards (10 seconds) to prevent hanging if interrupt handling fails.
Does this pull request potentially affect one of the following parts
@Public(Evolving): no —ResultStoreis an internal class in the SQL GatewayDocumentation
Was generative AI tooling used to co-author this PR?
Generated-by: Claude Opus 4.8