Skip to content

[DNM]#5039

Draft
lidezhu wants to merge 3 commits into
masterfrom
poc/kafka-consumer-syncpoint
Draft

[DNM]#5039
lidezhu wants to merge 3 commits into
masterfrom
poc/kafka-consumer-syncpoint

Conversation

@lidezhu
Copy link
Copy Markdown
Collaborator

@lidezhu lidezhu commented May 13, 2026

What problem does this PR solve?

Issue Number: close #xxx

What is changed and how it works?

Check List

Tests

  • Unit test
  • Integration test
  • Manual test (add detailed scripts or steps below)
  • No code

Questions

Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?

Release note

Please refer to [Release Notes Language Style Guide](https://pingcap.github.io/tidb-dev-guide/contribute-to-tidb/release-notes-style-guide.html) to write a quality release note.

If you don't think this PR needs a release note then fill it with `None`.

@ti-chi-bot
Copy link
Copy Markdown

ti-chi-bot Bot commented May 13, 2026

Skipping CI for Draft Pull Request.
If you want CI signal for your change, please convert it to an actual PR.
You can still manually trigger a test run with /test all

@ti-chi-bot ti-chi-bot Bot added do-not-merge/needs-linked-issue release-note Denotes a PR that will be considered when it comes time to generate release notes. do-not-merge/work-in-progress Indicates that a PR should not merge because it is a work in progress. labels May 13, 2026
@ti-chi-bot
Copy link
Copy Markdown

ti-chi-bot Bot commented May 13, 2026

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign asddongmen for approval. For more information see the Code Review Process.
Please ensure that each of them provides their approval before proceeding.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 13, 2026

Important

Review skipped

Draft detected.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 41a62084-ef15-4aff-9869-343b338da0c1

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch poc/kafka-consumer-syncpoint

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@ti-chi-bot ti-chi-bot Bot added the size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. label May 13, 2026
Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request implements a periodic syncpoint feature for the Kafka consumer, allowing it to maintain consistent upstream/downstream snapshot pairs in a downstream MySQL-compatible database. The implementation includes new configuration flags, a syncpoint storage layer, and writer logic to ensure events are flushed up to specific timestamps. Reviewer feedback suggests optimizing the database schema by using BIGINT UNSIGNED for timestamp columns to prevent truncation and improve query performance, removing a redundant sort in the DDL processing logic, and throttling the cleanup of old syncpoint records to reduce the performance impact on the downstream database.

return 0, err
}
query := fmt.Sprintf(
"SELECT primary_ts FROM %s.%s WHERE consumer_id = ? AND topic = ? ORDER BY CAST(primary_ts AS UNSIGNED) DESC LIMIT 1",
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using CAST(primary_ts AS UNSIGNED) in the ORDER BY clause prevents the database from using the index on the primary key (consumer_id, topic, primary_ts). If the column type is changed to BIGINT UNSIGNED (as suggested elsewhere), this cast can be removed, allowing for a much more efficient index-backed sort.

Suggested change
"SELECT primary_ts FROM %s.%s WHERE consumer_id = ? AND topic = ? ORDER BY CAST(primary_ts AS UNSIGNED) DESC LIMIT 1",
"SELECT primary_ts FROM %s.%s WHERE consumer_id = ? AND topic = ? ORDER BY primary_ts DESC LIMIT 1",

Comment on lines +122 to +123
primary_ts varchar(18),
secondary_ts varchar(18),
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The primary_ts and secondary_ts columns are defined as varchar(18). However, a 64-bit unsigned integer (TSO) can have up to 20 digits. Using varchar(18) may lead to truncation or insertion errors as TSOs grow over time. Additionally, storing numeric timestamps as strings makes sorting and range queries inefficient. It is recommended to use BIGINT UNSIGNED for these columns to ensure future-proofing and better performance.

Suggested change
primary_ts varchar(18),
secondary_ts varchar(18),
primary_ts BIGINT UNSIGNED,
secondary_ts BIGINT UNSIGNED,

Comment on lines +182 to +192
if s.retention > 0 {
cleanupQuery := fmt.Sprintf(
"DELETE IGNORE FROM %s.%s WHERE consumer_id = ? AND topic = ? AND created_at < (NOW() - INTERVAL %d SECOND)",
filter.TiCDCSystemSchema,
consumerSyncpointTable,
int64(s.retention.Seconds()),
)
if _, err = tx.ExecContext(ctx, cleanupQuery, s.consumerID, s.topic); err != nil {
log.Warn("cleanup stale consumer syncpoint records failed", zap.Error(err))
}
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The cleanup of stale syncpoint records is performed on every Write call within the same transaction. This is inefficient and can increase the latency of syncpoint writes, especially if the table contains many records. It is recommended to throttle this cleanup logic (e.g., run it only once per hour) to reduce the load on the downstream database, similar to the implementation in the standard MySQL sink.

Comment on lines +405 to +409
if len(w.ddlList) > 1 {
sort.SliceStable(w.ddlList, func(i, j int) bool {
return w.ddlList[i].GetCommitTs() < w.ddlList[j].GetCommitTs()
})
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This sorting of w.ddlList is redundant because w.ddlList is already sorted by CommitTs in the Write method (line 568) before maybeFlushConsumerSyncpoint (and subsequently flushDDLEventsUpTo) is called. Removing this redundant sort can slightly improve performance in the consumer loop.

@ti-chi-bot
Copy link
Copy Markdown

ti-chi-bot Bot commented May 13, 2026

[FORMAT CHECKER NOTIFICATION]

Notice: To remove the do-not-merge/needs-linked-issue label, please provide the linked issue number on one line in the PR body, for example: Issue Number: close #123 or Issue Number: ref #456.

📖 For more info, you can check the "Contribute Code" section in the development guide.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

do-not-merge/needs-linked-issue do-not-merge/work-in-progress Indicates that a PR should not merge because it is a work in progress. release-note Denotes a PR that will be considered when it comes time to generate release notes. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant