[DNM]#5039
Conversation
|
Skipping CI for Draft Pull Request. |
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
|
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request implements a periodic syncpoint feature for the Kafka consumer, allowing it to maintain consistent upstream/downstream snapshot pairs in a downstream MySQL-compatible database. The implementation includes new configuration flags, a syncpoint storage layer, and writer logic to ensure events are flushed up to specific timestamps. Reviewer feedback suggests optimizing the database schema by using BIGINT UNSIGNED for timestamp columns to prevent truncation and improve query performance, removing a redundant sort in the DDL processing logic, and throttling the cleanup of old syncpoint records to reduce the performance impact on the downstream database.
| return 0, err | ||
| } | ||
| query := fmt.Sprintf( | ||
| "SELECT primary_ts FROM %s.%s WHERE consumer_id = ? AND topic = ? ORDER BY CAST(primary_ts AS UNSIGNED) DESC LIMIT 1", |
There was a problem hiding this comment.
Using CAST(primary_ts AS UNSIGNED) in the ORDER BY clause prevents the database from using the index on the primary key (consumer_id, topic, primary_ts). If the column type is changed to BIGINT UNSIGNED (as suggested elsewhere), this cast can be removed, allowing for a much more efficient index-backed sort.
| "SELECT primary_ts FROM %s.%s WHERE consumer_id = ? AND topic = ? ORDER BY CAST(primary_ts AS UNSIGNED) DESC LIMIT 1", | |
| "SELECT primary_ts FROM %s.%s WHERE consumer_id = ? AND topic = ? ORDER BY primary_ts DESC LIMIT 1", |
| primary_ts varchar(18), | ||
| secondary_ts varchar(18), |
There was a problem hiding this comment.
The primary_ts and secondary_ts columns are defined as varchar(18). However, a 64-bit unsigned integer (TSO) can have up to 20 digits. Using varchar(18) may lead to truncation or insertion errors as TSOs grow over time. Additionally, storing numeric timestamps as strings makes sorting and range queries inefficient. It is recommended to use BIGINT UNSIGNED for these columns to ensure future-proofing and better performance.
| primary_ts varchar(18), | |
| secondary_ts varchar(18), | |
| primary_ts BIGINT UNSIGNED, | |
| secondary_ts BIGINT UNSIGNED, |
| if s.retention > 0 { | ||
| cleanupQuery := fmt.Sprintf( | ||
| "DELETE IGNORE FROM %s.%s WHERE consumer_id = ? AND topic = ? AND created_at < (NOW() - INTERVAL %d SECOND)", | ||
| filter.TiCDCSystemSchema, | ||
| consumerSyncpointTable, | ||
| int64(s.retention.Seconds()), | ||
| ) | ||
| if _, err = tx.ExecContext(ctx, cleanupQuery, s.consumerID, s.topic); err != nil { | ||
| log.Warn("cleanup stale consumer syncpoint records failed", zap.Error(err)) | ||
| } | ||
| } |
There was a problem hiding this comment.
The cleanup of stale syncpoint records is performed on every Write call within the same transaction. This is inefficient and can increase the latency of syncpoint writes, especially if the table contains many records. It is recommended to throttle this cleanup logic (e.g., run it only once per hour) to reduce the load on the downstream database, similar to the implementation in the standard MySQL sink.
| if len(w.ddlList) > 1 { | ||
| sort.SliceStable(w.ddlList, func(i, j int) bool { | ||
| return w.ddlList[i].GetCommitTs() < w.ddlList[j].GetCommitTs() | ||
| }) | ||
| } |
There was a problem hiding this comment.
|
[FORMAT CHECKER NOTIFICATION] Notice: To remove the 📖 For more info, you can check the "Contribute Code" section in the development guide. |
What problem does this PR solve?
Issue Number: close #xxx
What is changed and how it works?
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note