Skip to content

[FLINK-39875][table-gateway] Restore thread interrupt status in ResultStore#28333

Open
jubins wants to merge 1 commit into
apache:masterfrom
jubins:j-FLINK-39875-restore-thread-interrupt
Open

[FLINK-39875][table-gateway] Restore thread interrupt status in ResultStore#28333
jubins wants to merge 1 commit into
apache:masterfrom
jubins:j-FLINK-39875-restore-thread-interrupt

Conversation

@jubins
Copy link
Copy Markdown
Contributor

@jubins jubins commented Jun 5, 2026

What is the purpose of the change

Fixes FLINK-39875 — The processRecord() method in ResultStore catches InterruptedException but does not restore the thread's interrupt status, which violates Java concurrency best practices and could prevent proper shutdown or cancellation of the result retrieval thread.

When a thread's wait() is interrupted, the interrupt status is cleared. According to Java concurrency best practices, when catching InterruptedException, code should either propagate the exception or restore the interrupt status by calling Thread.currentThread().interrupt(). Failing to do so can cause the ResultRetrievalThread to not respond properly to shutdown signals sent by the close() method.

This PR restores the interrupt status and ensures the thread exits promptly when interrupted.

Brief change log

  • Added Thread.currentThread().interrupt() in processRecord() catch block to restore interrupt status
  • Added early return to skip record processing when interrupted
  • Enhanced ResultRetrievalThread loop condition to check !Thread.interrupted()
  • Added debug logging for interrupt events
  • Created ResultStoreTest with 4 comprehensive test cases:
    • testInterruptHandling() — Verifies proper interrupt handling during execution
    • testProcessRecordWithFullBuffer() — Tests buffer management with multiple records
    • testInterruptDuringBufferWait() — Validates interrupt response when waiting for buffer space
    • testGracefulShutdown() — Ensures quick and clean shutdown

Verifying this change

This change is covered by 4 new unit tests in ResultStoreTest:

  • testInterruptHandling() — Verifies that when the ResultStore is closed, the retrieval thread properly handles the interrupt and stops running
  • testProcessRecordWithFullBuffer() — Tests that records are processed correctly when the buffer fills up and waits are required
  • testInterruptDuringBufferWait() — Validates that interrupts are handled promptly even when the thread is waiting for buffer space
  • testGracefulShutdown() — Ensures that closing the ResultStore completes quickly (under 1 second), demonstrating proper interrupt handling

All tests use timeout guards (10 seconds) to prevent hanging if interrupt handling fails.

Does this pull request potentially affect one of the following parts

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): noResultStore is an internal class in the SQL Gateway
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no — only affects error/shutdown paths
  • Anything that affects deployment or recovery (JobManager, Checkpointing, Kubernetes/Yarn, ZooKeeper): no — limited to SQL Gateway result handling
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? not applicable

Was generative AI tooling used to co-author this PR?

  • Yes — Claude Code was used as a pair-programming assistant. All code was written, understood, and verified by the author.

Generated-by: Claude Opus 4.8

@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented Jun 5, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants