[server] Add DoL loopback to ensure new leader is fully caught up on VT#2314
Conversation
There was a problem hiding this comment.
Pull request overview
This PR introduces a Declaration-of-Leadership (DoL) loopback mechanism to ensure new leader replicas are fully caught up on the Version Topic (VT) before switching to consume from remote VT or Real-Time (RT) topics. This replaces the previous time-based heuristic with a deterministic approach that eliminates the risk of duplicate consumption and data inconsistencies during leader transitions.
Key Changes:
- New DoL control message type with unique GUID that leaders produce to local VT during STANDBY→LEADER transition
- Leader waits to consume its own DoL message back (loopback confirmation) before switching to remote sources
- Configurable rollout via separate flags for system stores and user stores (
SERVER_LEADER_HANDOVER_USE_DOL_MECHANISM_FOR_SYSTEM_STORESandSERVER_LEADER_HANDOVER_USE_DOL_MECHANISM_FOR_USER_STORES)
Reviewed changes
Copilot reviewed 24 out of 24 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
DolStamp.java |
New class tracking DoL state (produced/consumed flags, leadership term, host ID) during leader transition |
DolGuidGenerator.java |
GUID generator for DoL control messages using UUID type 3 |
DoLStampGuidGenerator.java |
Duplicate GUID generator implementation (identical to DolGuidGenerator) |
KafkaKey.java |
Adds DOL_STAMP constant for DoL control message key |
VeniceWriter.java |
Implements sendDoLStamp() and getDoLStampKME() for producing DoL messages |
StoreIngestionTask.java |
Adds checkAndHandleDoLMessage() to detect and handle consumed DoL messages, validates DoL messages like heartbeats |
LeaderFollowerStoreIngestionTask.java |
Orchestrates DoL mechanism: initializes DoL state, sends DoL stamp, checks readiness in canSwitchToLeaderTopic(), falls back to legacy behavior when DoL disabled |
PartitionConsumptionState.java |
Tracks DoL state and highest observed leadership term per partition |
LeaderFollowerPartitionStateModel.java |
Uses Helix message creation timestamp as leadership term |
SharedKafkaConsumer.java |
Adds region name and index to toString() for better debugging |
ConfigKeys.java |
Defines two new config flags for DoL mechanism enablement |
VeniceServerConfig.java |
Exposes DoL config flags via getters |
VeniceServerWrapper.java |
Enables DoL mechanism for both system and user stores in integration tests |
VeniceClusterWrapper.java |
Increases timeout for version wait from 60s to 120s to accommodate DoL latency |
TestHybrid.java |
Adds timeout and unique store name for log compaction test |
TestHybridMultiRegion.java |
Updates test to use sendEmptyPushAndWait() and improves error message assertion |
TestTopicRequestOnHybridDelete.java |
Removes unused imports and deletes deleteStoreAfterStartedPushAllowsNewPush test |
log4j2.properties |
Updates logging configuration (contains hardcoded user path) |
StoreIngestionTaskTest.java |
Renames test method from resolveRtTopicPartitionWithPubSubBrokerAddress to resolveTopicPartitionWithPubSubBrokerAddress |
SharedKafkaConsumerTest.java |
Updates test to pass region name and index to SharedKafkaConsumer constructor |
ActiveActiveStoreIngestionTask.java |
Updates method calls from resolveRtTopicPartitionWithPubSubBrokerAddress to resolveTopicPartitionWithPubSubBrokerAddress |
KafkaConsumerService.java |
Passes region name and index when creating SharedKafkaConsumer instances |
PartitionWiseKafkaConsumerService.java |
Adds consumer instance to log output for better debugging |
HelixReadWriteSchemaRepository.java |
Adds store name to exception log message |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
0998fab to
1bbff55
Compare
sixpluszero
left a comment
There was a problem hiding this comment.
Thank you for your change! A great work to improve ingestion stability. I left some comments for clarification.
|
Hi there. This pull request has been inactive for 30 days. To keep our review queue healthy, we plan to close it in 7 days unless there is new activity. If you are still working on this, please push a commit, leave a comment, or convert it to draft to signal intent. Thank you for your time and contributions. |
|
Closing this pull request due to 37 days of inactivity. This is not a judgment on the value of the work. If you would like to continue, please reopen or open a new PR and we will be happy to take another look. Thank you again for contributing. |
1bbff55 to
daadb36
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 21 out of 21 changed files in this pull request and generated 12 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
5b07505 to
aa253a3
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 24 out of 24 changed files in this pull request and generated 9 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
aa253a3 to
9c845cd
Compare
9c845cd to
d77efec
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 25 out of 25 changed files in this pull request and generated 8 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
d77efec to
8cd08d6
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 25 out of 25 changed files in this pull request and generated 5 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
sixpluszero
left a comment
There was a problem hiding this comment.
Overall lgtm.
Logic wise I think it makes sense. Left some small comments..
83e2c30 to
3011f14
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 24 out of 24 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 24 out of 24 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Add SERVER_LEADER_HANDOVER_USE_DOL_MECHANISM config to enable the new Declaration of Leadership (DoL) mechanism for fast leader handover. Changes: - Add SERVER_LEADER_HANDOVER_USE_DOL_MECHANISM config key in ConfigKeys.java - Add leaderHandoverUseDoLMechanism field and getter in VeniceServerConfig - Refactor canSwitchToLeaderTopic() to check config and route logic - Extract canSwitchToLeaderTopicLegacy() with original time-based logic - Add comprehensive design document for DoL mechanism Default: false (maintains backward compatibility with legacy time-based mechanism) This is step 1 of the DoL implementation. The actual DoL loopback logic will be implemented in subsequent commits when the config is enabled. Address review comments - [guid] Remove duplicate DolGuidGenerator class Consolidated duplicate GUID generator classes: - Removed DolGuidGenerator.java (duplicate) - Updated KafkaKey.java to use DoLStampGuidGenerator - Both VeniceWriter and KafkaKey now use the same DoLStampGuidGenerator class This ensures consistent GUID generation for Declaration of Leadership (DoL) messages across the codebase.
- StoreIngestionTask: Fix NPE when hostName is null in LeaderMetadata - StoreIngestionTask: Use Objects.equals for null-safe host comparison - LeaderFollowerStoreIngestionTask: Add null check for produceResult - LeaderFollowerStoreIngestionTask: Change log level from WARN to DEBUG to avoid log spam during legacy fallback - DoLStampGuidGenerator: Use StandardCharsets.UTF_8 for consistent GUID generation across platforms - ConfigKeys: Fix javadoc to reflect actual default values (true) - VeniceWriter: Fix bug where completedFuture() result was not returned
The comment was only mentioning heartbeat records but the code also skips validation for DoL stamp records. Updated for clarity.
- Fix javadoc: "Consumer/drainer thread" -> "Drainer thread" in PCS - Cache DoL GUID as static final instead of recomputing on every call - Remove commented-out Kafka logger settings in integration test log4j2
Acquire partitionLocks[partition] before producing DoL stamp message to prevent potential races with concurrent writes to the same partition.
…delay The waitForStateVersion background thread polls every 60s by default. Override it to 100ms in the test static block, matching the existing SCHEMA_POLLING_DELAY_MS override.
…0s test delay" This reverts commit 8e98076.
8bed3c1 to
2373ca1
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 24 out of 24 changed files in this pull request and generated no new comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
Thanks a lot for the reviewer, @sixpluszero! |
[server] Add DoL loopback to ensure new leader is fully caught up on VT
Newly elected leaders were re-consuming the NR source topic because the
promotion logic relied only on elapsed time since the last consumed
message before switching to the remote version topic (VT). This
time-based heuristic is insufficient and can cause duplicate consumption
and data inconsistencies.
Fix the issue by requiring the new leader to produce a Declaration-of-
Leadership (DoL) marker to the local VT and wait until it consumes that
same marker back. This provides a deterministic guarantee that the
leader has fully caught up on VT before switching to RT or NR sources.
New Configuration Keys
server.leader.handover.use.dol.mechanism.for.system.storestrueserver.leader.handover.use.dol.mechanism.for.user.storestrueCode changes
Concurrency-Specific Checks
Both reviewer and PR author to verify
synchronized,RWLock) are used where needed. Volatile sufficient for this use case.ConcurrentHashMap,CopyOnWriteArrayList).How was this PR tested?
Does this PR introduce any user-facing or breaking changes?