Skip to content

refactor(flagd): replace Thread.sleep with ScheduledExecutorService in SyncStreamQueueSource#1734

Open
aepfli wants to merge 2 commits intomainfrom
fix/replace-thread-sleep-with-scheduler
Open

refactor(flagd): replace Thread.sleep with ScheduledExecutorService in SyncStreamQueueSource#1734
aepfli wants to merge 2 commits intomainfrom
fix/replace-thread-sleep-with-scheduler

Conversation

@aepfli
Copy link
Member

@aepfli aepfli commented Mar 11, 2026

Closes #1659
Supersedes #1660

Changes

Replaces the blocking Thread.sleep(maxBackoffMs) throttle in SyncStreamQueueSource.observeSyncStream() with a non-blocking ScheduledExecutorService.

Key changes to SyncStreamQueueSource

  • Add a single-threaded ScheduledExecutorService field (flagd-sync-retry-scheduler, daemon thread)
  • init(): submit initial observeSyncStream() run to the scheduler instead of spawning a raw daemon thread
  • observeSyncStream(): when throttle is needed, schedule the next invocation via retryScheduler.schedule() and return (non-blocking) instead of sleeping
  • shutdown(): call retryScheduler.shutdownNow() before shutting down the channel connector
  • Handle RejectedExecutionException on schedule() (race condition between throttle and shutdown)
  • Restore Thread.currentThread().interrupt() on InterruptedException

Why

Thread.sleep blocks the thread for the entire backoff duration, preventing clean shutdown and wasting a thread. The scheduler approach is non-blocking and integrates cleanly with the existing shutdown logic.

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request refactors the SyncStreamQueueSource to improve thread management and enable cleaner shutdowns. By replacing a blocking Thread.sleep with a ScheduledExecutorService, the system avoids tying up threads during backoff periods, leading to more efficient resource utilization and better responsiveness to shutdown requests.

Highlights

  • Replaced Thread.sleep: The blocking Thread.sleep(maxBackoffMs) throttle in SyncStreamQueueSource.observeSyncStream() was replaced with a non-blocking ScheduledExecutorService.
  • ScheduledExecutorService: A single-threaded ScheduledExecutorService (flagd-sync-retry-scheduler, daemon thread) was added to manage retry delays.
  • Initialization: The init() method now submits the initial observeSyncStream() run to the scheduler instead of spawning a raw daemon thread.
  • Throttling Logic: When throttling is needed, the next invocation is scheduled via retryScheduler.schedule() and the method returns immediately, making it non-blocking.
  • Shutdown: The shutdown() method now calls retryScheduler.shutdownNow() to properly terminate the scheduler.
  • Error Handling: RejectedExecutionException is now handled during scheduling, and Thread.currentThread().interrupt() is restored on InterruptedException.
Changelog
  • providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java
    • Added necessary imports for Executors, RejectedExecutionException, and ScheduledExecutorService.
    • Introduced a ScheduledExecutorService for managing retry delays.
    • Modified the init method to use the scheduler for initial stream observation.
    • Ensured the ScheduledExecutorService is shut down cleanly.
    • Updated the observeSyncStream method to use scheduled retries instead of blocking Thread.sleep.
    • Added handling for RejectedExecutionException during scheduling.
    • Restored the interrupt status of the current thread on InterruptedException.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors SyncStreamQueueSource to replace a blocking Thread.sleep with a non-blocking ScheduledExecutorService for handling retry backoff. This is a great improvement for resource management and enables a cleaner shutdown process. The implementation is solid, including handling for RejectedExecutionException during shutdown and correctly managing the thread's interrupted status. I have one suggestion to make the shutdown sequence even more robust by ensuring the scheduler has terminated before shutting down other components.

…n SyncStreamQueueSource

- Add ScheduledExecutorService field (flagd-sync-retry-scheduler daemon thread)
- Replace Thread.sleep(maxBackoffMs) throttle with scheduler.schedule()
- When throttle is needed, schedule the next observeSyncStream() invocation
  and return from the current one (non-blocking)
- Use retryScheduler.execute() in init() instead of a raw daemon thread
- Shut down retryScheduler in shutdown() via shutdownNow()
- Handle RejectedExecutionException on schedule (race with shutdown)
- Restore Thread.currentThread().interrupt() on InterruptedException

Closes #1659

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Signed-off-by: Simon Schrottner <simon.schrottner@dynatrace.com>
@aepfli aepfli force-pushed the fix/replace-thread-sleep-with-scheduler branch from a080df1 to e054aa6 Compare March 12, 2026 07:59
Add retryScheduler.awaitTermination(deadline, MILLISECONDS) after
shutdownNow() to ensure the scheduler thread has fully stopped before
the gRPC channel is torn down, preventing race conditions during shutdown.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Signed-off-by: Simon Schrottner <simon.schrottner@dynatrace.com>
@aepfli aepfli force-pushed the fix/replace-thread-sleep-with-scheduler branch from e054aa6 to 194eaf2 Compare March 12, 2026 08:05
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[flagd] Use a scheduler instead of Thread.sleep for retry/backoff in SyncStreamQueueSource

7 participants