dispatcher,event,cloudstorage: add DML two-stage ack#4263
dispatcher,event,cloudstorage: add DML two-stage ack#42633AceShowHand wants to merge 4 commits intopingcap:masterfrom
Conversation
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
Summary of ChangesHello @3AceShowHand, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a two-stage acknowledgment mechanism for DML events, separating the acknowledgment of an event being enqueued into the sink pipeline from its final flushing to the downstream. This change aims to improve the responsiveness of the dispatcher by allowing it to process subsequent events earlier, while still ensuring checkpoint advancement relies on full flushing. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
📝 WalkthroughWalkthroughIntroduces a PostEnqueue stage for DMLEvent with atomic guarding, shifts enqueue callbacks to be invoked at enqueue time, preserves enqueue state across filtering, updates dispatcher/sink to use PostEnqueue, and adds tests validating concurrency and lifecycle. Changes
Sequence DiagramsequenceDiagram
participant Dispatcher
participant DMLEvent
participant Worker
participant Sink
Dispatcher->>DMLEvent: AddPostEnqueueFunc(wakeCallback)
Dispatcher->>Worker: Enqueue DML event (fragment)
Worker->>DMLEvent: PostEnqueue()
DMLEvent->>Dispatcher: Execute enqueue callbacks (once)
Worker->>Sink: Process event (defragment/emit)
Worker->>DMLEvent: PostFlush()
DMLEvent->>DMLEvent: Ensure PostEnqueue() ran (no-op)
DMLEvent->>Sink: Execute flush callbacks
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related issues
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces a two-stage acknowledgement mechanism for DML events by adding a PostEnqueue callback, which is triggered earlier than the existing PostFlush callback. This allows for better pipelining and potentially higher throughput. The changes are implemented across the dispatcher, event, and cloudstorage sink components.
The core logic change appears correct and is supported by new unit tests. However, I've identified a critical race condition in the implementation of PostEnqueue which could lead to callbacks being executed multiple times under concurrent access. I've provided suggestions to fix this using atomic operations. I've also suggested enhancing the tests to cover concurrent scenarios to prevent similar issues in the future.
| func TestDMLEventPostEnqueueFuncs(t *testing.T) { | ||
| t.Parallel() | ||
|
|
||
| event := &DMLEvent{} | ||
| var called int64 | ||
| event.AddPostEnqueueFunc(func() { | ||
| atomic.AddInt64(&called, 1) | ||
| }) | ||
| event.AddPostEnqueueFunc(func() { | ||
| atomic.AddInt64(&called, 1) | ||
| }) | ||
|
|
||
| event.PostEnqueue() | ||
| event.PostEnqueue() | ||
|
|
||
| require.Equal(t, int64(2), atomic.LoadInt64(&called)) | ||
| } |
There was a problem hiding this comment.
This test is good for verifying the sequential logic, but it doesn't cover the case of concurrent calls to PostEnqueue, where a race condition exists in the current implementation. After addressing the race condition I pointed out in dml_event.go, it would be beneficial to add a test case that calls PostEnqueue and/or PostFlush from multiple goroutines to ensure the fix is effective and the implementation is thread-safe.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@pkg/common/event/dml_event.go`:
- Around line 395-402: The boolean guard postEnqueueCalled in the DML event
struct is not concurrency-safe and can cause double-invokes/races; replace the
bool guard with a sync.Once (or an atomic CAS) and update the PostEnqueue
execution paths to call once.Do(...) so callbacks in PostTxnEnqueued and
PostTxnFlushed are invoked exactly once; apply the same change to the other
occurrence referenced around lines 645-653, ensuring the struct holds a
sync.Once (e.g., postEnqueueOnce) and all code paths use that Once to run the
callbacks instead of checking/setting postEnqueueCalled.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
downstreamadapter/dispatcher/basic_dispatcher.godownstreamadapter/sink/cloudstorage/dml_writers.gopkg/common/event/dml_event.gopkg/common/event/dml_event_test.go
|
/test all |
There was a problem hiding this comment.
🧹 Nitpick comments (1)
pkg/common/event/dml_event.go (1)
693-704: ClearPostEnqueueFunc inconsistency: method name suggests reset but doesn't reset thepostEnqueueCalledguard.While
ClearPostEnqueueFuncclears the callback slice, it doesn't reset the atomic guard that prevents re-execution. If callbacks are cleared afterPostEnqueue()has already fired and new ones are added, they will never execute. However, this method is not used anywhere in the codebase and DML events follow a create-once pattern, so this has no practical impact. Consider either documenting thatClearPostEnqueueFuncis a one-way operation (callbacks cannot be re-armed after enqueue), or reset the atomic flag if re-use is intended.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/common/event/dml_event.go` around lines 693 - 704, ClearPostEnqueueFunc currently clears the PostTxnEnqueued slice but does not reset the atomic guard (postEnqueueCalled) that PostEnqueue() uses to prevent re-execution, so clearing then adding callbacks will never run them; update ClearPostEnqueueFunc to also reset the atomic flag (postEnqueueCalled) to its zero/false state when intended to allow re-use, or if re-arm is not desired, document that ClearPostEnqueueFunc is one-way—reference the ClearPostEnqueueFunc method, the PostTxnEnqueued slice, the postEnqueueCalled atomic guard, and the PostEnqueue() execution path when making the change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@pkg/common/event/dml_event.go`:
- Around line 693-704: ClearPostEnqueueFunc currently clears the PostTxnEnqueued
slice but does not reset the atomic guard (postEnqueueCalled) that PostEnqueue()
uses to prevent re-execution, so clearing then adding callbacks will never run
them; update ClearPostEnqueueFunc to also reset the atomic flag
(postEnqueueCalled) to its zero/false state when intended to allow re-use, or if
re-arm is not desired, document that ClearPostEnqueueFunc is one-way—reference
the ClearPostEnqueueFunc method, the PostTxnEnqueued slice, the
postEnqueueCalled atomic guard, and the PostEnqueue() execution path when making
the change.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
downstreamadapter/sink/cloudstorage/defragmenter.godownstreamadapter/sink/cloudstorage/dml_writers_test.gopkg/common/event/active_active.gopkg/common/event/active_active_test.gopkg/common/event/dml_event.gopkg/common/event/dml_event_test.go
|
/test all |
|
/test all |
There was a problem hiding this comment.
🧹 Nitpick comments (1)
downstreamadapter/sink/cloudstorage/dml_writers.go (1)
116-116: Fix typo in the inline comment.Line 116 says
encoupled; please change it tocoupledfor clarity.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@downstreamadapter/sink/cloudstorage/dml_writers.go` at line 116, Update the inline comment that currently reads "emit a TxnCallbackableEvent encoupled with a sequence number starting from one." to correct the typo by replacing "encoupled" with "coupled" in the comment near the TxnCallbackableEvent emission in dml_writers.go so it reads "emit a TxnCallbackableEvent coupled with a sequence number starting from one."
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@downstreamadapter/sink/cloudstorage/dml_writers.go`:
- Line 116: Update the inline comment that currently reads "emit a
TxnCallbackableEvent encoupled with a sequence number starting from one." to
correct the typo by replacing "encoupled" with "coupled" in the comment near the
TxnCallbackableEvent emission in dml_writers.go so it reads "emit a
TxnCallbackableEvent coupled with a sequence number starting from one."
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
downstreamadapter/sink/cloudstorage/dml_writers.gopkg/sink/cloudstorage/config.gopkg/sink/cloudstorage/config_test.go
|
/test all |
|
/test all |
|
@3AceShowHand: The following tests failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
What problem does this PR solve?
Issue Number: close #4269
The dispatcher wake callback for DML was previously tied to
PostFlush, so wake latency could be dominated by sink flush latency. This coupled scheduling progress too tightly with remote flush speed.This PR introduces a two-stage ack model for DML:
It also fixes three follow-up correctness/semantics issues tracked in #4269:
PostEnqueueexactly-once guard was not concurrency-safe.What is changed and how it works?
DMLEvent:PostTxnEnqueued []func()PostEnqueue()AddPostEnqueueFunc(...)ClearPostEnqueueFunc(...)PostFlush()still callsPostEnqueue()first as fallback, butPostEnqueuenow uses atomic CAS to guarantee exactly-once execution under concurrency.AddPostFlushFunc(...)toAddPostEnqueueFunc(...)inBasicDispatcher.handleEvents.FilterDMLEventrebuild path now preserves enqueue callbacks/state when constructing filteredDMLEvent.PostEnqueue/PostFlushrace and exactly-once behaviorBehavioral intent:
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
No known compatibility break:
Expected performance effect:
Do you need to update user documentation, design documentation or monitoring documentation?
No.
Release note
Summary by CodeRabbit
New Features
Improvements
Tests