refactor(flagd): replace Thread.sleep with ScheduledExecutorService in SyncStreamQueueSource#1734
refactor(flagd): replace Thread.sleep with ScheduledExecutorService in SyncStreamQueueSource#1734
Conversation
e6ecfe7 to
45a4de2
Compare
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request refactors the SyncStreamQueueSource to improve thread management and enable cleaner shutdowns. By replacing a blocking Thread.sleep with a ScheduledExecutorService, the system avoids tying up threads during backoff periods, leading to more efficient resource utilization and better responsiveness to shutdown requests. Highlights
Changelog
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request refactors SyncStreamQueueSource to replace a blocking Thread.sleep with a non-blocking ScheduledExecutorService for handling retry backoff. This is a great improvement for resource management and enables a cleaner shutdown process. The implementation is solid, including handling for RejectedExecutionException during shutdown and correctly managing the thread's interrupted status. I have one suggestion to make the shutdown sequence even more robust by ensuring the scheduler has terminated before shutting down other components.
...e/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java
Show resolved
Hide resolved
…n SyncStreamQueueSource - Add ScheduledExecutorService field (flagd-sync-retry-scheduler daemon thread) - Replace Thread.sleep(maxBackoffMs) throttle with scheduler.schedule() - When throttle is needed, schedule the next observeSyncStream() invocation and return from the current one (non-blocking) - Use retryScheduler.execute() in init() instead of a raw daemon thread - Shut down retryScheduler in shutdown() via shutdownNow() - Handle RejectedExecutionException on schedule (race with shutdown) - Restore Thread.currentThread().interrupt() on InterruptedException Closes #1659 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> Signed-off-by: Simon Schrottner <simon.schrottner@dynatrace.com>
a080df1 to
e054aa6
Compare
Add retryScheduler.awaitTermination(deadline, MILLISECONDS) after shutdownNow() to ensure the scheduler thread has fully stopped before the gRPC channel is torn down, preventing race conditions during shutdown. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> Signed-off-by: Simon Schrottner <simon.schrottner@dynatrace.com>
e054aa6 to
194eaf2
Compare
Closes #1659
Supersedes #1660
Changes
Replaces the blocking
Thread.sleep(maxBackoffMs)throttle inSyncStreamQueueSource.observeSyncStream()with a non-blockingScheduledExecutorService.Key changes to
SyncStreamQueueSourceScheduledExecutorServicefield (flagd-sync-retry-scheduler, daemon thread)init(): submit initialobserveSyncStream()run to the scheduler instead of spawning a raw daemon threadobserveSyncStream(): when throttle is needed, schedule the next invocation viaretryScheduler.schedule()and return (non-blocking) instead of sleepingshutdown(): callretryScheduler.shutdownNow()before shutting down the channel connectorRejectedExecutionExceptiononschedule()(race condition between throttle and shutdown)Thread.currentThread().interrupt()onInterruptedExceptionWhy
Thread.sleepblocks the thread for the entire backoff duration, preventing clean shutdown and wasting a thread. The scheduler approach is non-blocking and integrates cleanly with the existing shutdown logic.