diff --git a/cmd/kafka-consumer/main.go b/cmd/kafka-consumer/main.go index d6d3e65058..40f86cb274 100644 --- a/cmd/kafka-consumer/main.go +++ b/cmd/kafka-consumer/main.go @@ -23,6 +23,7 @@ import ( "os/signal" "runtime/debug" "syscall" + "time" "github.com/google/uuid" "github.com/pingcap/log" @@ -61,6 +62,9 @@ func main() { flag.StringVar(&consumerOption.ca, "ca", "", "CA certificate path for Kafka SSL connection") flag.StringVar(&consumerOption.cert, "cert", "", "Certificate path for Kafka SSL connection") flag.StringVar(&consumerOption.key, "key", "", "Private key path for Kafka SSL connection") + flag.BoolVar(&consumerOption.enableSyncpoint, "enable-syncpoint", false, "enable periodic aligned syncpoint records in downstream") + flag.DurationVar(&consumerOption.syncpointInterval, "syncpoint-interval", 10*time.Minute, "interval used to align downstream syncpoint records") + flag.DurationVar(&consumerOption.syncpointRetention, "syncpoint-retention", 24*time.Hour, "retention used to clean old downstream syncpoint records") flag.Parse() err := logger.InitLogger(&logger.Config{ diff --git a/cmd/kafka-consumer/option.go b/cmd/kafka-consumer/option.go index ae7ba1f198..360d6b9f4b 100644 --- a/cmd/kafka-consumer/option.go +++ b/cmd/kafka-consumer/option.go @@ -18,6 +18,7 @@ import ( "net/url" "strconv" "strings" + "time" "github.com/pingcap/log" "github.com/pingcap/ticdc/cmd/util" @@ -56,6 +57,10 @@ type option struct { upstreamTiDBDSN string enableTableAcrossNodes bool + + enableSyncpoint bool + syncpointInterval time.Duration + syncpointRetention time.Duration } func newOption() *option { @@ -161,6 +166,10 @@ func (o *option) Adjust(upstreamURIStr string, configFile string) { o.codecConfig.AvroEnableWatermark = true } o.enableTableAcrossNodes = putil.GetOrZero(replicaConfig.Scheduler.EnableTableAcrossNodes) + if o.enableSyncpoint && o.syncpointInterval <= 0 { + log.Panic("syncpoint interval must be positive when syncpoint is enabled", + zap.Duration("syncpointInterval", o.syncpointInterval)) + } log.Info("consumer option adjusted", zap.String("address", strings.Join(o.address, ",")), @@ -173,5 +182,8 @@ func (o *option) Adjust(upstreamURIStr string, configFile string) { zap.Int("maxBatchSize", o.maxBatchSize), zap.String("configFile", configFile), zap.String("upstreamURI", upstreamURI.String()), - zap.String("downstreamURI", o.downstreamURI)) + zap.String("downstreamURI", o.downstreamURI), + zap.Bool("enableSyncpoint", o.enableSyncpoint), + zap.Duration("syncpointInterval", o.syncpointInterval), + zap.Duration("syncpointRetention", o.syncpointRetention)) } diff --git a/cmd/kafka-consumer/syncpoint.go b/cmd/kafka-consumer/syncpoint.go new file mode 100644 index 0000000000..073d325e0c --- /dev/null +++ b/cmd/kafka-consumer/syncpoint.go @@ -0,0 +1,206 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "database/sql" + "fmt" + "net/url" + "strconv" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + commonType "github.com/pingcap/ticdc/pkg/common" + "github.com/pingcap/ticdc/pkg/config" + cerrors "github.com/pingcap/ticdc/pkg/errors" + "github.com/pingcap/ticdc/pkg/filter" + mysqlcfg "github.com/pingcap/ticdc/pkg/sink/mysql" + "go.uber.org/zap" +) + +const consumerSyncpointTable = "consumer_syncpoint_v1" + +type consumerSyncpointStore interface { + Init(ctx context.Context) (uint64, error) + Write(ctx context.Context, primaryTs uint64) error + Close() error +} + +type consumerSyncpointStoreConfig struct { + downstreamURI string + consumerID string + topic string + retention time.Duration +} + +type mysqlConsumerSyncpointStore struct { + db *sql.DB + consumerID string + topic string + retention time.Duration +} + +func newMySQLConsumerSyncpointStore( + ctx context.Context, + cfg consumerSyncpointStoreConfig, +) (consumerSyncpointStore, error) { + sinkURI, err := url.Parse(cfg.downstreamURI) + if err != nil { + return nil, cerrors.WrapError(cerrors.ErrSinkURIInvalid, err) + } + scheme := config.GetScheme(sinkURI) + if !config.IsMySQLCompatibleScheme(scheme) { + return nil, cerrors.ErrInvalidReplicaConfig.FastGenByArgs( + "consumer syncpoint requires a tidb or mysql downstream") + } + changefeedID := commonType.NewChangeFeedIDWithName(cfg.consumerID, commonType.DefaultKeyspaceName) + changefeedConfig := &config.ChangefeedConfig{ + ChangefeedID: changefeedID, + SinkURI: cfg.downstreamURI, + SinkConfig: config.GetDefaultReplicaConfig().Sink, + } + changefeedConfig.SinkConfig.TiDBSourceID = 1 + _, db, err := mysqlcfg.NewMysqlConfigAndDB(ctx, changefeedID, sinkURI, changefeedConfig) + if err != nil { + return nil, errors.Trace(err) + } + return &mysqlConsumerSyncpointStore{ + db: db, + consumerID: cfg.consumerID, + topic: cfg.topic, + retention: cfg.retention, + }, nil +} + +func (s *mysqlConsumerSyncpointStore) Init(ctx context.Context) (uint64, error) { + if err := s.createTable(ctx); err != nil { + return 0, err + } + query := fmt.Sprintf( + "SELECT primary_ts FROM %s.%s WHERE consumer_id = ? AND topic = ? ORDER BY CAST(primary_ts AS UNSIGNED) DESC LIMIT 1", + filter.TiCDCSystemSchema, + consumerSyncpointTable, + ) + var primaryTs string + err := s.db.QueryRowContext(ctx, query, s.consumerID, s.topic).Scan(&primaryTs) + if err == sql.ErrNoRows { + return 0, nil + } + if err != nil { + return 0, cerrors.WrapError(cerrors.ErrMySQLTxnError, errors.Trace(err)) + } + ts, err := strconv.ParseUint(primaryTs, 10, 64) + if err != nil { + return 0, cerrors.WrapError(cerrors.ErrMySQLTxnError, errors.Trace(err)) + } + return ts, nil +} + +func (s *mysqlConsumerSyncpointStore) createTable(ctx context.Context) error { + createDatabaseQuery := fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s", filter.TiCDCSystemSchema) + if _, err := s.db.ExecContext(ctx, createDatabaseQuery); err != nil { + return cerrors.WrapError(cerrors.ErrMySQLTxnError, errors.Trace(err)) + } + createTableQuery := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s.%s + ( + ticdc_cluster_id varchar(255), + consumer_id varchar(255), + topic varchar(255), + primary_ts varchar(18), + secondary_ts varchar(18), + created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + INDEX (created_at), + PRIMARY KEY (consumer_id, topic, primary_ts) + );`, filter.TiCDCSystemSchema, consumerSyncpointTable) + if _, err := s.db.ExecContext(ctx, createTableQuery); err != nil { + return cerrors.WrapError(cerrors.ErrMySQLTxnError, errors.Trace(err)) + } + return nil +} + +func (s *mysqlConsumerSyncpointStore) Write(ctx context.Context, primaryTs uint64) error { + tx, err := s.db.BeginTx(ctx, nil) + if err != nil { + return cerrors.WrapError(cerrors.ErrMySQLTxnError, errors.Trace(err)) + } + committed := false + defer func() { + if !committed { + if rollbackErr := tx.Rollback(); rollbackErr != nil { + log.Warn("rollback consumer syncpoint transaction failed", zap.Error(rollbackErr)) + } + } + }() + + secondaryTs := "0" + gotSecondaryTs := true + if err = tx.QueryRowContext(ctx, "select @@tidb_current_ts").Scan(&secondaryTs); err != nil { + gotSecondaryTs = false + log.Warn("get downstream tidb current ts failed, use zero secondary ts", + zap.Uint64("primaryTs", primaryTs), zap.Error(err)) + } + + insertQuery := fmt.Sprintf( + "INSERT IGNORE INTO %s.%s (ticdc_cluster_id, consumer_id, topic, primary_ts, secondary_ts) VALUES (?, ?, ?, ?, ?)", + filter.TiCDCSystemSchema, + consumerSyncpointTable, + ) + if _, err = tx.ExecContext(ctx, insertQuery, + config.GetGlobalServerConfig().ClusterID, + s.consumerID, + s.topic, + strconv.FormatUint(primaryTs, 10), + secondaryTs, + ); err != nil { + return cerrors.WrapError(cerrors.ErrMySQLTxnError, errors.Trace(err)) + } + + if gotSecondaryTs { + setExternalTsQuery := fmt.Sprintf("SET GLOBAL tidb_external_ts = %s", secondaryTs) + if _, err = tx.ExecContext(ctx, setExternalTsQuery); err != nil { + if cerrors.IsSyncPointIgnoreError(err) { + log.Warn("set global external ts failed, ignore this error", zap.Error(err)) + } else { + return cerrors.WrapError(cerrors.ErrMySQLTxnError, errors.Trace(err)) + } + } + } + + if s.retention > 0 { + cleanupQuery := fmt.Sprintf( + "DELETE IGNORE FROM %s.%s WHERE consumer_id = ? AND topic = ? AND created_at < (NOW() - INTERVAL %d SECOND)", + filter.TiCDCSystemSchema, + consumerSyncpointTable, + int64(s.retention.Seconds()), + ) + if _, err = tx.ExecContext(ctx, cleanupQuery, s.consumerID, s.topic); err != nil { + log.Warn("cleanup stale consumer syncpoint records failed", zap.Error(err)) + } + } + + if err = tx.Commit(); err != nil { + return cerrors.WrapError(cerrors.ErrMySQLTxnError, errors.Trace(err)) + } + committed = true + return nil +} + +func (s *mysqlConsumerSyncpointStore) Close() error { + if s == nil || s.db == nil { + return nil + } + return s.db.Close() +} diff --git a/cmd/kafka-consumer/syncpoint_test.go b/cmd/kafka-consumer/syncpoint_test.go new file mode 100644 index 0000000000..d176422143 --- /dev/null +++ b/cmd/kafka-consumer/syncpoint_test.go @@ -0,0 +1,49 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/stretchr/testify/require" +) + +func TestMysqlConsumerSyncpointStoreWriteReadsCurrentTsInTxn(t *testing.T) { + t.Parallel() + + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) + require.NoError(t, err) + defer db.Close() + + store := &mysqlConsumerSyncpointStore{ + db: db, + consumerID: "consumer-1", + topic: "topic-1", + } + + mock.ExpectBegin() + mock.ExpectQuery("select @@tidb_current_ts"). + WillReturnRows(sqlmock.NewRows([]string{"@@tidb_current_ts"}).AddRow("456")) + mock.ExpectExec("INSERT IGNORE INTO tidb_cdc.consumer_syncpoint_v1 (ticdc_cluster_id, consumer_id, topic, primary_ts, secondary_ts) VALUES (?, ?, ?, ?, ?)"). + WithArgs("default", "consumer-1", "topic-1", "123", "456"). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec("SET GLOBAL tidb_external_ts = 456"). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectCommit() + + require.NoError(t, store.Write(context.Background(), 123)) + require.NoError(t, mock.ExpectationsWereMet()) +} diff --git a/cmd/kafka-consumer/writer.go b/cmd/kafka-consumer/writer.go index ac073a2958..7e5eb3cbd8 100644 --- a/cmd/kafka-consumer/writer.go +++ b/cmd/kafka-consumer/writer.go @@ -18,6 +18,7 @@ import ( "database/sql" "math" "sort" + "sync" "time" "github.com/confluentinc/confluent-kafka-go/v2/kafka" @@ -35,6 +36,7 @@ import ( timodel "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/parser" "github.com/pingcap/tidb/pkg/parser/ast" + "github.com/tikv/client-go/v2/oracle" "go.uber.org/atomic" "go.uber.org/zap" ) @@ -79,6 +81,8 @@ type writer struct { progresses []*partitionProgress ddlList []*commonEvent.DDLEvent ddlWithMaxCommitTs map[int64]uint64 + mu sync.Mutex + pendingDMLFlushes []*dmlFlushTracker // this should be used by the canal-json, avro and open protocol partitionTableAccessor *common.PartitionTableAccessor @@ -89,6 +93,21 @@ type writer struct { maxBatchSize int mysqlSink sink.Sink enableTableAcrossNodes bool + + syncpointEnabled bool + syncpointInterval time.Duration + nextSyncpointTs uint64 + lastSyncedSyncpointTs uint64 + consumerSyncpointStore consumerSyncpointStore +} + +type dmlFlushTracker struct { + tableID int64 + group *util.EventsGroup + maxCommitTs uint64 + total int64 + done chan struct{} + flushed atomic.Int64 } func newWriter(ctx context.Context, o *option) *writer { @@ -101,6 +120,8 @@ func newWriter(ctx context.Context, o *option) *writer { ddlList: make([]*commonEvent.DDLEvent, 0), ddlWithMaxCommitTs: make(map[int64]uint64), enableTableAcrossNodes: o.enableTableAcrossNodes, + syncpointEnabled: o.enableSyncpoint, + syncpointInterval: o.syncpointInterval, } var ( db *sql.DB @@ -141,90 +162,53 @@ func newWriter(ctx context.Context, o *option) *writer { if err != nil { log.Panic("cannot create the mysql sink", zap.Error(err)) } + if o.enableSyncpoint { + store, err := newMySQLConsumerSyncpointStore(ctx, consumerSyncpointStoreConfig{ + downstreamURI: o.downstreamURI, + consumerID: o.groupID, + topic: o.topic, + retention: o.syncpointRetention, + }) + if err != nil { + log.Panic("cannot create consumer syncpoint store", zap.Error(err)) + } + lastSyncpointTs, err := store.Init(ctx) + if err != nil { + log.Panic("cannot initialize consumer syncpoint store", zap.Error(err)) + } + w.consumerSyncpointStore = store + w.lastSyncedSyncpointTs = lastSyncpointTs + if lastSyncpointTs > 0 { + w.nextSyncpointTs = nextAlignedSyncpointTs(lastSyncpointTs, o.syncpointInterval) + } + log.Info("consumer syncpoint initialized", + zap.Uint64("lastSyncedSyncpointTs", w.lastSyncedSyncpointTs), + zap.Uint64("nextSyncpointTs", w.nextSyncpointTs), + zap.Duration("syncpointInterval", o.syncpointInterval)) + } return w } func (w *writer) run(ctx context.Context) error { + if w.consumerSyncpointStore != nil { + defer func() { + if err := w.consumerSyncpointStore.Close(); err != nil { + log.Warn("close consumer syncpoint store failed", zap.Error(err)) + } + }() + } return w.mysqlSink.Run(ctx) } func (w *writer) flushDDLEvent(ctx context.Context, ddl *commonEvent.DDLEvent) error { - var ( - done = make(chan struct{}, 1) - - total int - flushed atomic.Int64 - ) - tableIDs := w.getBlockTableIDs(ddl) commitTs := ddl.GetCommitTs() - resolvedEvents := make([]*commonEvent.DMLEvent, 0) - // resolvedGroups records which EventsGroup has flushed events so we can - // advance its AppliedWatermark after the flush is fully finished. - resolvedGroups := make([]struct { - group *util.EventsGroup - maxCommitTs uint64 - }, 0) - for tableID := range tableIDs { - for _, progress := range w.progresses { - g, ok := progress.eventsGroup[tableID] - if !ok { - continue - } - before := len(resolvedEvents) - resolvedEvents = g.ResolveInto(commitTs, resolvedEvents) - resolvedCount := len(resolvedEvents) - before - if resolvedCount == 0 { - continue - } - - resolvedGroups = append(resolvedGroups, struct { - group *util.EventsGroup - maxCommitTs uint64 - }{ - group: g, - maxCommitTs: resolvedEvents[len(resolvedEvents)-1].GetCommitTs(), - }) - total += resolvedCount - } - } - - if total == 0 { - return w.mysqlSink.WriteBlockEvent(ddl) - } - for _, e := range resolvedEvents { - e.AddPostFlushFunc(func() { - if flushed.Inc() == int64(total) { - close(done) - } - }) - w.mysqlSink.AddDMLEvent(e) - } - - log.Info("flush DML events before DDL", zap.Uint64("DDLCommitTs", commitTs), zap.Int("total", total)) - start := time.Now() - ticker := time.NewTicker(time.Minute) - defer ticker.Stop() - for { - select { - case <-ctx.Done(): - return context.Cause(ctx) - case <-done: - log.Info("flush DML events before DDL done", zap.Uint64("DDLCommitTs", commitTs), - zap.Int("total", total), zap.Duration("duration", time.Since(start)), - zap.Any("tables", tableIDs)) - for _, item := range resolvedGroups { - if item.maxCommitTs > item.group.AppliedWatermark { - item.group.AppliedWatermark = item.maxCommitTs - } - } - return w.mysqlSink.WriteBlockEvent(ddl) - case <-ticker.C: - log.Warn("DML events cannot be flushed in time", - zap.Uint64("DDLCommitTs", commitTs), zap.String("query", ddl.Query), - zap.Int("total", total), zap.Int64("flushed", flushed.Load())) - } + trackers := w.dispatchDMLEventsByTargetTs(commitTs, tableIDs) + trackers = append(trackers, w.pendingDMLFlushesUpTo(commitTs, tableIDs)...) + if err := w.waitDMLFlushes(ctx, trackers, "DDL", commitTs); err != nil { + return err } + return w.mysqlSink.WriteBlockEvent(ddl) } func (w *writer) getBlockTableIDs(ddl *commonEvent.DDLEvent) map[int64]struct{} { @@ -286,76 +270,218 @@ func (w *writer) globalWatermark() uint64 { } func (w *writer) flushDMLEventsByWatermark(ctx context.Context) error { - var ( - done = make(chan struct{}, 1) + return w.flushDMLEventsByTargetTs(ctx, w.globalWatermark()) +} + +func (w *writer) flushDMLEventsByTargetTs(ctx context.Context, targetTs uint64) error { + trackers := w.dispatchDMLEventsByTargetTs(targetTs, nil) + trackers = append(trackers, w.pendingDMLFlushesUpTo(targetTs, nil)...) + return w.waitDMLFlushes(ctx, trackers, "watermark", targetTs) +} - total int - flushed atomic.Int64 +func (w *writer) dispatchDMLEventsByTargetTs(targetTs uint64, tableIDs map[int64]struct{}) []*dmlFlushTracker { + var ( + total int + trackers []*dmlFlushTracker ) - watermark := w.globalWatermark() - resolvedEvents := make([]*commonEvent.DMLEvent, 0) - // resolvedGroups records which EventsGroup has flushed events so we can - // advance its AppliedWatermark after the flush is fully finished. - resolvedGroups := make([]struct { - group *util.EventsGroup - maxCommitTs uint64 - }, 0) for _, p := range w.progresses { - for _, group := range p.eventsGroup { + for tableID, group := range p.eventsGroup { + if tableIDs != nil { + if _, ok := tableIDs[tableID]; !ok { + continue + } + } + resolvedEvents := make([]*commonEvent.DMLEvent, 0) before := len(resolvedEvents) - resolvedEvents = group.ResolveInto(watermark, resolvedEvents) + resolvedEvents = group.ResolveInto(targetTs, resolvedEvents) resolvedCount := len(resolvedEvents) - before if resolvedCount == 0 { continue } - resolvedGroups = append(resolvedGroups, struct { - group *util.EventsGroup - maxCommitTs uint64 - }{ + tracker := &dmlFlushTracker{ + tableID: tableID, group: group, maxCommitTs: resolvedEvents[len(resolvedEvents)-1].GetCommitTs(), - }) + total: int64(resolvedCount), + done: make(chan struct{}), + } + w.trackDMLFlush(tracker) + trackers = append(trackers, tracker) total += resolvedCount + + for _, e := range resolvedEvents { + e.AddPostFlushFunc(func() { + if tracker.flushed.Inc() == tracker.total { + w.mu.Lock() + if tracker.maxCommitTs > tracker.group.AppliedWatermark { + tracker.group.AppliedWatermark = tracker.maxCommitTs + } + w.mu.Unlock() + close(tracker.done) + } + }) + w.mysqlSink.AddDMLEvent(e) + log.Debug("dispatch DML event", zap.Int64("tableID", e.GetTableID()), + zap.Uint64("commitTs", e.GetCommitTs()), zap.Any("startTs", e.GetStartTs())) + } } } - if total == 0 { - return nil + + if total > 0 { + log.Info("dispatch DML events by watermark", zap.Uint64("watermark", targetTs), zap.Int("total", total)) } - for _, e := range resolvedEvents { - e.AddPostFlushFunc(func() { - if flushed.Inc() == int64(total) { - close(done) + return trackers +} + +func (w *writer) trackDMLFlush(tracker *dmlFlushTracker) { + w.mu.Lock() + defer w.mu.Unlock() + w.pendingDMLFlushes = append(w.pendingDMLFlushes, tracker) +} + +func (w *writer) pendingDMLFlushesUpTo(targetTs uint64, tableIDs map[int64]struct{}) []*dmlFlushTracker { + w.mu.Lock() + defer w.mu.Unlock() + + trackers := make([]*dmlFlushTracker, 0) + for _, tracker := range w.pendingDMLFlushes { + if tracker.maxCommitTs > targetTs { + continue + } + if tableIDs != nil { + if _, ok := tableIDs[tracker.tableID]; !ok { + continue } - }) - w.mysqlSink.AddDMLEvent(e) - log.Debug("flush DML event", zap.Int64("tableID", e.GetTableID()), - zap.Uint64("commitTs", e.GetCommitTs()), zap.Any("startTs", e.GetStartTs())) + } + trackers = append(trackers, tracker) + } + return trackers +} + +func (w *writer) cleanupDoneDMLFlushes() { + w.mu.Lock() + defer w.mu.Unlock() + + remaining := w.pendingDMLFlushes[:0] + for _, tracker := range w.pendingDMLFlushes { + select { + case <-tracker.done: + default: + remaining = append(remaining, tracker) + } + } + clear(w.pendingDMLFlushes[len(remaining):]) + w.pendingDMLFlushes = remaining +} + +func (w *writer) waitDMLFlushes( + ctx context.Context, + trackers []*dmlFlushTracker, + reason string, + targetTs uint64, +) error { + if len(trackers) == 0 { + return nil + } + seen := make(map[*dmlFlushTracker]struct{}, len(trackers)) + uniqueTrackers := trackers[:0] + var total int64 + for _, tracker := range trackers { + if _, ok := seen[tracker]; ok { + continue + } + seen[tracker] = struct{}{} + uniqueTrackers = append(uniqueTrackers, tracker) + total += tracker.total } - log.Info("flush DML events by watermark", zap.Uint64("watermark", watermark), zap.Int("total", total)) + log.Info("wait DML events flushed", zap.String("reason", reason), zap.Uint64("targetTs", targetTs), + zap.Int("groups", len(uniqueTrackers)), zap.Int64("total", total)) start := time.Now() ticker := time.NewTicker(time.Minute) defer ticker.Stop() - for { - select { - case <-ctx.Done(): - return context.Cause(ctx) - case <-done: - log.Info("flush DML events done", zap.Uint64("watermark", watermark), - zap.Int("total", total), zap.Duration("duration", time.Since(start))) - for _, item := range resolvedGroups { - if item.maxCommitTs > item.group.AppliedWatermark { - item.group.AppliedWatermark = item.maxCommitTs + + for _, tracker := range uniqueTrackers { + for { + select { + case <-ctx.Done(): + return context.Cause(ctx) + case <-tracker.done: + goto nextTracker + case <-ticker.C: + var flushed int64 + for _, item := range uniqueTrackers { + flushed += item.flushed.Load() } + log.Warn("DML events cannot be flushed in time", zap.String("reason", reason), + zap.Uint64("targetTs", targetTs), zap.Int64("total", total), zap.Int64("flushed", flushed)) } - return nil - case <-ticker.C: - log.Warn("DML events cannot be flushed in time", zap.Uint64("watermark", watermark), - zap.Int("total", total), zap.Int64("flushed", flushed.Load())) } + nextTracker: } + + log.Info("DML events flushed", zap.String("reason", reason), zap.Uint64("targetTs", targetTs), + zap.Int64("total", total), zap.Duration("duration", time.Since(start))) + w.cleanupDoneDMLFlushes() + return nil +} + +func (w *writer) flushDDLEventsUpTo(ctx context.Context, targetTs uint64) { + if len(w.ddlList) > 1 { + sort.SliceStable(w.ddlList, func(i, j int) bool { + return w.ddlList[i].GetCommitTs() < w.ddlList[j].GetCommitTs() + }) + } + remaining := make([]*commonEvent.DDLEvent, 0, len(w.ddlList)) + for _, ddl := range w.ddlList { + if ddl.GetCommitTs() > targetTs { + remaining = append(remaining, ddl) + continue + } + if err := w.flushDDLEvent(ctx, ddl); err != nil { + log.Panic("write DDL event failed", zap.Error(err), + zap.String("DDL", ddl.Query), zap.Uint64("commitTs", ddl.GetCommitTs())) + } + } + w.ddlList = remaining +} + +func (w *writer) maybeFlushConsumerSyncpoint(ctx context.Context, watermark uint64) (bool, error) { + if !w.syncpointEnabled || w.consumerSyncpointStore == nil || w.syncpointInterval <= 0 || watermark == 0 { + return false, nil + } + if w.nextSyncpointTs == 0 { + w.nextSyncpointTs = calculateFloorAlignedSyncpointTs(watermark, w.syncpointInterval) + if w.nextSyncpointTs == 0 { + return false, nil + } + } + flushed := false + for w.nextSyncpointTs != 0 && w.nextSyncpointTs <= watermark { + targetTs := w.nextSyncpointTs + if targetTs <= w.lastSyncedSyncpointTs { + w.nextSyncpointTs = nextAlignedSyncpointTs(targetTs, w.syncpointInterval) + continue + } + w.flushDDLEventsUpTo(ctx, targetTs) + trackers := w.dispatchDMLEventsByTargetTs(targetTs, nil) + trackers = append(trackers, w.pendingDMLFlushesUpTo(targetTs, nil)...) + if err := w.waitDMLFlushes(ctx, trackers, "consumer syncpoint", targetTs); err != nil { + return false, err + } + if err := w.consumerSyncpointStore.Write(ctx, targetTs); err != nil { + return false, err + } + flushed = true + w.lastSyncedSyncpointTs = targetTs + w.nextSyncpointTs = nextAlignedSyncpointTs(targetTs, w.syncpointInterval) + log.Info("consumer syncpoint flushed", + zap.Uint64("syncpointTs", targetTs), + zap.Uint64("nextSyncpointTs", w.nextSyncpointTs)) + } + return flushed, nil } // WriteMessage is to decode kafka message to event. @@ -477,6 +603,14 @@ func (w *writer) Write(ctx context.Context, messageType common.MessageType) bool } watermark := w.globalWatermark() + consumerSyncpointFlushed := false + if messageType == common.MessageTypeResolved && w.syncpointEnabled { + var err error + consumerSyncpointFlushed, err = w.maybeFlushConsumerSyncpoint(ctx, watermark) + if err != nil { + log.Panic("flush consumer syncpoint failed", zap.Error(err), zap.Uint64("watermark", watermark)) + } + } ddlList := make([]*commonEvent.DDLEvent, 0) for i, todoDDL := range w.ddlList { // DDL ordering must follow commitTs (see appendDDL). Traditionally we wait until the global @@ -518,10 +652,18 @@ func (w *writer) Write(ctx context.Context, messageType common.MessageType) bool } if messageType == common.MessageTypeResolved { - // since watermark is broadcast to all partitions, so that each partition can flush events individually. - err := w.flushDMLEventsByWatermark(ctx) - if err != nil { - log.Panic("flush dml events by the watermark failed", zap.Error(err)) + // With consumer syncpoint enabled, normal resolved events should keep the + // consumer moving: dispatch DMLs to the sink and only wait at aligned + // syncpoint barriers. Without syncpoint, keep the historical behavior so + // offset commits only happen after DMLs are flushed. + if w.syncpointEnabled { + w.dispatchDMLEventsByTargetTs(watermark, nil) + } else { + // since watermark is broadcast to all partitions, so that each partition can flush events individually. + err := w.flushDMLEventsByWatermark(ctx) + if err != nil { + log.Panic("flush dml events by the watermark failed", zap.Error(err)) + } } } @@ -533,9 +675,40 @@ func (w *writer) Write(ctx context.Context, messageType common.MessageType) bool zap.Int("length", len(w.ddlList))) return false } + if messageType == common.MessageTypeResolved && w.syncpointEnabled { + return consumerSyncpointFlushed + } return true } +func calculateAlignedSyncpointTs(ts uint64, interval time.Duration, skipSyncpointAtTs bool) uint64 { + if interval <= 0 { + return 0 + } + k := oracle.GetTimeFromTS(ts).Sub(time.Unix(0, 0)) / interval + if oracle.GetTimeFromTS(ts).Sub(time.Unix(0, 0))%interval != 0 || oracle.ExtractLogical(ts) != 0 { + k++ + } else if skipSyncpointAtTs { + k++ + } + return oracle.GoTimeToTS(time.Unix(0, 0).Add(k * interval)) +} + +func nextAlignedSyncpointTs(ts uint64, interval time.Duration) uint64 { + return calculateAlignedSyncpointTs(ts, interval, true) +} + +func calculateFloorAlignedSyncpointTs(ts uint64, interval time.Duration) uint64 { + if interval <= 0 { + return 0 + } + k := oracle.GetTimeFromTS(ts).Sub(time.Unix(0, 0)) / interval + if k <= 0 { + return 0 + } + return oracle.GoTimeToTS(time.Unix(0, 0).Add(k * interval)) +} + func (w *writer) onDDL(ddl *commonEvent.DDLEvent) { switch w.protocol { case config.ProtocolCanalJSON, config.ProtocolOpen, config.ProtocolAvro: @@ -602,25 +775,29 @@ func (w *writer) appendRow2Group(dml *commonEvent.DMLEvent, progress *partitionP group = util.NewEventsGroup(progress.partition, tableID) progress.eventsGroup[tableID] = group } + w.mu.Lock() + appliedWatermark := group.AppliedWatermark + highWatermark := group.HighWatermark + w.mu.Unlock() // IMPORTANT: Kafka offsets are append-only, but CommitTs can go backwards after // a TiCDC restart/retry (at-least-once replay). We must not drop such events // solely based on a "seen" watermark (e.g. HighWatermark). The only safe // ignore condition is "already flushed to downstream". - if commitTs <= group.AppliedWatermark { + if commitTs <= appliedWatermark { log.Warn("DML event replayed after applied, ignore it", zap.Int64("tableID", tableID), zap.Int32("partition", group.Partition), zap.Uint64("commitTs", commitTs), zap.Any("offset", offset), - zap.Uint64("appliedWatermark", group.AppliedWatermark), zap.Uint64("highWatermark", group.HighWatermark), + zap.Uint64("appliedWatermark", appliedWatermark), zap.Uint64("highWatermark", highWatermark), zap.Uint64("partitionWatermark", progress.watermark), zap.Any("watermarkOffset", progress.watermarkOffset), zap.String("schema", schema), zap.String("table", table), zap.Any("protocol", w.protocol)) return } - forceInsert := commitTs < group.HighWatermark || commitTs < progress.watermark || w.enableTableAcrossNodes + forceInsert := commitTs < highWatermark || commitTs < progress.watermark || w.enableTableAcrossNodes if forceInsert { log.Warn("DML event commit ts fallback, append with forceInsert", zap.Int32("partition", group.Partition), zap.Any("offset", offset), - zap.Uint64("commitTs", commitTs), zap.Uint64("highWatermark", group.HighWatermark), - zap.Uint64("appliedWatermark", group.AppliedWatermark), + zap.Uint64("commitTs", commitTs), zap.Uint64("highWatermark", highWatermark), + zap.Uint64("appliedWatermark", appliedWatermark), zap.Uint64("partitionWatermark", progress.watermark), zap.Any("watermarkOffset", progress.watermarkOffset), zap.String("schema", schema), zap.String("table", table), zap.Int64("tableID", tableID), zap.Stringer("eventType", dml.RowTypes[0]), zap.Any("protocol", w.protocol), diff --git a/cmd/kafka-consumer/writer_test.go b/cmd/kafka-consumer/writer_test.go index ec9aa34a43..8c801dc113 100644 --- a/cmd/kafka-consumer/writer_test.go +++ b/cmd/kafka-consumer/writer_test.go @@ -15,7 +15,9 @@ package main import ( "context" + "strconv" "testing" + "time" "github.com/confluentinc/confluent-kafka-go/v2/kafka" "github.com/golang/mock/gomock" @@ -29,6 +31,7 @@ import ( timodel "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/util/chunk" "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/oracle" ) func newMockSink(t *testing.T) (*sinkmock.MockSink, *[]string) { @@ -49,6 +52,42 @@ func newMockSink(t *testing.T) (*sinkmock.MockSink, *[]string) { return s, &ddls } +type recordingSyncpointStore struct { + writes []uint64 + actions *[]string +} + +func (s *recordingSyncpointStore) Init(ctx context.Context) (uint64, error) { + return 0, nil +} + +func (s *recordingSyncpointStore) Write(ctx context.Context, primaryTs uint64) error { + s.writes = append(s.writes, primaryTs) + if s.actions != nil { + *s.actions = append(*s.actions, "syncpoint:"+strconv.FormatUint(primaryTs, 10)) + } + return nil +} + +func (s *recordingSyncpointStore) Close() error { + return nil +} + +func newActionMockSink(t *testing.T, actions *[]string) *sinkmock.MockSink { + t.Helper() + + ctrl := gomock.NewController(t) + s := sinkmock.NewMockSink(ctrl) + s.EXPECT().AddDMLEvent(gomock.Any()).AnyTimes() + s.EXPECT().WriteBlockEvent(gomock.Any()).DoAndReturn(func(event commonEvent.BlockEvent) error { + if ddl, ok := event.(*commonEvent.DDLEvent); ok { + *actions = append(*actions, "ddl:"+ddl.Query) + } + return nil + }).AnyTimes() + return s +} + func TestWriterWrite_executesIndependentCreateTableWithoutWatermark(t *testing.T) { // Scenario: In some integration tests the upstream intentionally pauses dispatcher creation, which can // stall resolved-ts (consumer watermark) below the commitTs of CREATE TABLE / CREATE DATABASE DDLs. @@ -268,6 +307,127 @@ func TestWriterWrite_handlesOutOfOrderDDLsByCommitTs(t *testing.T) { require.Equal(t, "CREATE TABLE `common_1`.`a` (`a` BIGINT PRIMARY KEY,`b` INT)", w.ddlList[0].Query) } +func TestWriterWrite_consumerSyncpointWritesOnceAtAlignedWatermark(t *testing.T) { + ctx := context.Background() + actions := make([]string, 0) + store := &recordingSyncpointStore{actions: &actions} + s := newActionMockSink(t, &actions) + syncpointTs := oracle.GoTimeToTS(time.Unix(10, 0)) + w := &writer{ + progresses: []*partitionProgress{ + {partition: 0, watermark: syncpointTs}, + {partition: 1, watermark: syncpointTs}, + }, + mysqlSink: s, + syncpointEnabled: true, + syncpointInterval: time.Second, + nextSyncpointTs: syncpointTs, + consumerSyncpointStore: store, + lastSyncedSyncpointTs: 0, + ddlList: make([]*commonEvent.DDLEvent, 0), + ddlWithMaxCommitTs: make(map[int64]uint64), + enableTableAcrossNodes: false, + partitionTableAccessor: codecCommon.NewPartitionTableAccessor(), + } + + w.Write(ctx, codecCommon.MessageTypeResolved) + w.Write(ctx, codecCommon.MessageTypeResolved) + + require.Equal(t, []uint64{syncpointTs}, store.writes) + require.Equal(t, []string{"syncpoint:" + strconv.FormatUint(syncpointTs, 10)}, actions) + require.Equal(t, syncpointTs, w.lastSyncedSyncpointTs) + require.Greater(t, w.nextSyncpointTs, syncpointTs) +} + +func TestWriterWrite_consumerSyncpointPreservesDDLOrdering(t *testing.T) { + ctx := context.Background() + actions := make([]string, 0) + store := &recordingSyncpointStore{actions: &actions} + s := newActionMockSink(t, &actions) + firstSyncpointTs := oracle.GoTimeToTS(time.Unix(10, 0)) + ddlTs := oracle.GoTimeToTS(time.Unix(10, int64(500*time.Millisecond))) + secondSyncpointTs := oracle.GoTimeToTS(time.Unix(11, 0)) + w := &writer{ + progresses: []*partitionProgress{ + {partition: 0, watermark: secondSyncpointTs}, + {partition: 1, watermark: secondSyncpointTs}, + }, + mysqlSink: s, + syncpointEnabled: true, + syncpointInterval: time.Second, + nextSyncpointTs: firstSyncpointTs, + consumerSyncpointStore: store, + ddlList: []*commonEvent.DDLEvent{ + { + Query: "ALTER TABLE `test`.`t` ADD COLUMN `c` INT", + SchemaName: "test", + TableName: "t", + Type: byte(timodel.ActionAddColumn), + FinishedTs: ddlTs, + BlockedTables: &commonEvent.InfluencedTables{ + InfluenceType: commonEvent.InfluenceTypeNormal, + TableIDs: []int64{1}, + }, + }, + }, + ddlWithMaxCommitTs: make(map[int64]uint64), + } + + w.Write(ctx, codecCommon.MessageTypeResolved) + + require.Equal(t, []string{ + "syncpoint:" + strconv.FormatUint(firstSyncpointTs, 10), + "ddl:ALTER TABLE `test`.`t` ADD COLUMN `c` INT", + "syncpoint:" + strconv.FormatUint(secondSyncpointTs, 10), + }, actions) + require.Empty(t, w.ddlList) + require.Equal(t, []uint64{firstSyncpointTs, secondSyncpointTs}, store.writes) +} + +func TestWriterWrite_consumerSyncpointDispatchesDMLWithoutWaitingBeforeBarrier(t *testing.T) { + ctx := context.Background() + ctrl := gomock.NewController(t) + s := sinkmock.NewMockSink(ctrl) + dispatched := make([]uint64, 0) + s.EXPECT().AddDMLEvent(gomock.Any()).DoAndReturn(func(event *commonEvent.DMLEvent) { + dispatched = append(dispatched, event.GetCommitTs()) + }).AnyTimes() + s.EXPECT().WriteBlockEvent(gomock.Any()).AnyTimes() + + watermark := oracle.GoTimeToTS(time.Unix(10, int64(500*time.Millisecond))) + nextSyncpointTs := oracle.GoTimeToTS(time.Unix(11, 0)) + group := util.NewEventsGroup(0, 1) + group.Append(&commonEvent.DMLEvent{ + PhysicalTableID: 1, + CommitTs: oracle.GoTimeToTS(time.Unix(10, 0)), + RowTypes: []common.RowType{common.RowTypeInsert}, + Rows: chunk.NewChunkWithCapacity(nil, 0), + TableInfo: &common.TableInfo{ + TableName: common.TableName{Schema: "test", Table: "t"}, + }, + }, false) + + w := &writer{ + progresses: []*partitionProgress{ + {partition: 0, watermark: watermark, eventsGroup: map[int64]*util.EventsGroup{1: group}}, + }, + mysqlSink: s, + syncpointEnabled: true, + syncpointInterval: time.Second, + nextSyncpointTs: nextSyncpointTs, + consumerSyncpointStore: &recordingSyncpointStore{}, + ddlList: make([]*commonEvent.DDLEvent, 0), + ddlWithMaxCommitTs: make(map[int64]uint64), + } + + needCommit := w.Write(ctx, codecCommon.MessageTypeResolved) + + require.False(t, needCommit) + require.Equal(t, []uint64{oracle.GoTimeToTS(time.Unix(10, 0))}, dispatched) + require.Len(t, w.pendingDMLFlushes, 1) + require.Empty(t, group.GetAllEvents()) +} + func TestAppendRow2Group_DoesNotDropCommitTsFallbackBeforeApplied(t *testing.T) { // Scenario: // 1) TiCDC writes DML messages to Kafka in commitTs order. diff --git a/tests/integration_tests/kafka_consumer_syncpoint/conf/changefeed.toml b/tests/integration_tests/kafka_consumer_syncpoint/conf/changefeed.toml new file mode 100644 index 0000000000..f03aa76c48 --- /dev/null +++ b/tests/integration_tests/kafka_consumer_syncpoint/conf/changefeed.toml @@ -0,0 +1,8 @@ +[integrity] +integrity-check-level = "correctness" +corruption-handle-level = "error" + +[sink] +send-bootstrap-interval-in-sec = 5 +send-bootstrap-in-msg-count = 100 +send-all-bootstrap-at-start = true diff --git a/tests/integration_tests/kafka_consumer_syncpoint/conf/diff_config_final.toml b/tests/integration_tests/kafka_consumer_syncpoint/conf/diff_config_final.toml new file mode 100644 index 0000000000..ebaf17e34d --- /dev/null +++ b/tests/integration_tests/kafka_consumer_syncpoint/conf/diff_config_final.toml @@ -0,0 +1,29 @@ +# diff Configuration. + +check-thread-count = 4 + +export-fix-sql = true + +check-struct-only = false + +[task] + output-dir = "/tmp/tidb_cdc_test/kafka_consumer_syncpoint/sync_diff/output" + + source-instances = ["tidb0"] + + target-instance = "mysql1" + + target-check-tables = ["kafka_consumer_syncpoint.t"] + +[data-sources] +[data-sources.tidb0] + host = "127.0.0.1" + port = 4000 + user = "root" + password = "" + +[data-sources.mysql1] + host = "127.0.0.1" + port = 3306 + user = "root" + password = "" diff --git a/tests/integration_tests/kafka_consumer_syncpoint/conf/diff_config_part1.toml b/tests/integration_tests/kafka_consumer_syncpoint/conf/diff_config_part1.toml new file mode 100644 index 0000000000..2f5ac0988e --- /dev/null +++ b/tests/integration_tests/kafka_consumer_syncpoint/conf/diff_config_part1.toml @@ -0,0 +1,23 @@ +# diff Configuration. + +check-thread-count = 4 + +export-fix-sql = true + +check-struct-only = false + +[task] + output-dir = "/tmp/tidb_cdc_test/kafka_consumer_syncpoint/sync_diff/output" + + source-instances = ["tidb0"] + + target-instance = "mysql1" + + target-check-tables = ["kafka_consumer_syncpoint.t"] + +[data-sources] +[data-sources.tidb0] + host = "127.0.0.1" + port = 4000 + user = "root" + password = "" diff --git a/tests/integration_tests/kafka_consumer_syncpoint/conf/diff_config_part2.toml b/tests/integration_tests/kafka_consumer_syncpoint/conf/diff_config_part2.toml new file mode 100644 index 0000000000..906d3ac807 --- /dev/null +++ b/tests/integration_tests/kafka_consumer_syncpoint/conf/diff_config_part2.toml @@ -0,0 +1,6 @@ + +[data-sources.mysql1] + host = "127.0.0.1" + port = 3306 + user = "root" + password = "" diff --git a/tests/integration_tests/kafka_consumer_syncpoint/run.sh b/tests/integration_tests/kafka_consumer_syncpoint/run.sh new file mode 100755 index 0000000000..d1966046ea --- /dev/null +++ b/tests/integration_tests/kafka_consumer_syncpoint/run.sh @@ -0,0 +1,106 @@ +#!/bin/bash + +# [DESCRIPTION]: +# Verify that kafka-consumer syncpoint records describe a consistent upstream/downstream snapshot pair. +# [STEP]: +# 1. Create a Kafka changefeed and consume it into downstream TiDB with --enable-syncpoint. +# 2. Generate DML workload while the consumer periodically writes tidb_cdc.consumer_syncpoint_v1. +# 3. Pick the latest consumer syncpoint and compare upstream primary_ts with downstream secondary_ts. + +set -eu + +CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + +DB_NAME=kafka_consumer_syncpoint +TABLE_NAME=t +CHANGEFEED_ID=kafka-consumer-syncpoint +CONSUMER_GROUP_ID=kafka-consumer-syncpoint-consumer + +function deployConfig() { + local primaryTs=$1 + local secondaryTs=$2 + + cat $CUR/conf/diff_config_part1.toml >$WORK_DIR/diff_config.toml + echo " snapshot = \"$primaryTs\"" >>$WORK_DIR/diff_config.toml + cat $CUR/conf/diff_config_part2.toml >>$WORK_DIR/diff_config.toml + echo " snapshot = \"$secondaryTs\"" >>$WORK_DIR/diff_config.toml +} + +function runWorkload() { + run_sql "CREATE DATABASE $DB_NAME;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "CREATE TABLE $DB_NAME.$TABLE_NAME(id INT PRIMARY KEY, v INT, c VARCHAR(64));" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + check_table_exists "$DB_NAME.$TABLE_NAME" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 120 + + for i in $(seq 1 20); do + run_sql "INSERT INTO $DB_NAME.$TABLE_NAME VALUES ($i, $i, 'value-$i');" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + sleep 1 + done + + for i in $(seq 1 10); do + run_sql "UPDATE $DB_NAME.$TABLE_NAME SET v = v + 100 WHERE id = $i;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + sleep 1 + done + + run_sql "CREATE TABLE $DB_NAME.finish_mark(id INT PRIMARY KEY);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + check_table_exists "$DB_NAME.finish_mark" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 120 +} + +function run() { + if [ "$SINK_TYPE" != "kafka" ]; then + echo "[$(date)] kafka_consumer_syncpoint only runs with kafka sink" + return + fi + + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + + start_tidb_cluster --workdir $WORK_DIR + + run_sql "SET GLOBAL tidb_enable_external_ts_read = on;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + + TOPIC_NAME="ticdc-kafka-consumer-syncpoint-$RANDOM" + SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=simple" + DOWNSTREAM_URI="mysql://root@127.0.0.1:3306/?safe-mode=true&batch-dml-enable=false&enable-ddl-ts=false" + + cdc_cli_changefeed create --sink-uri="$SINK_URI" --config="$CUR/conf/changefeed.toml" -c "$CHANGEFEED_ID" + sleep 5 + + cdc_kafka_consumer \ + --upstream-uri "$SINK_URI" \ + --downstream-uri "$DOWNSTREAM_URI" \ + --upstream-tidb-dsn="root@tcp(${UP_TIDB_HOST}:${UP_TIDB_PORT})/?" \ + --config="$CUR/conf/changefeed.toml" \ + --consumer-group-id="$CONSUMER_GROUP_ID" \ + --enable-syncpoint \ + --syncpoint-interval=2s \ + --syncpoint-retention=1h \ + --log-file "$WORK_DIR/cdc_kafka_consumer.log" \ + --log-level debug >>"$WORK_DIR/cdc_kafka_consumer_stdout.log" 2>&1 & + + runWorkload + + ensure 60 "mysql -h${DOWN_TIDB_HOST} -P${DOWN_TIDB_PORT} -uroot -N -s -e \"SELECT COUNT(*) FROM tidb_cdc.consumer_syncpoint_v1 WHERE consumer_id='$CONSUMER_GROUP_ID' AND topic='$TOPIC_NAME' AND secondary_ts <> '0';\" | grep -E '^[1-9][0-9]*$'" + + syncpointRow=$(mysql -h${DOWN_TIDB_HOST} -P${DOWN_TIDB_PORT} -uroot -N -s -e "SELECT primary_ts, secondary_ts FROM tidb_cdc.consumer_syncpoint_v1 WHERE consumer_id='$CONSUMER_GROUP_ID' AND topic='$TOPIC_NAME' AND secondary_ts <> '0' ORDER BY CAST(primary_ts AS UNSIGNED) DESC LIMIT 1;") + primaryTs=$(echo "$syncpointRow" | awk '{print $1}') + secondaryTs=$(echo "$syncpointRow" | awk '{print $2}') + echo "[$(date)] check kafka consumer syncpoint primary_ts: $primaryTs, secondary_ts: $secondaryTs" + + run_sql "SET GLOBAL tidb_enable_external_ts_read = off;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + + deployConfig "$primaryTs" "$secondaryTs" + check_sync_diff $WORK_DIR $WORK_DIR/diff_config.toml 60 + check_sync_diff $WORK_DIR $CUR/conf/diff_config_final.toml 60 + + killall cdc_kafka_consumer || true + cleanup_process $CDC_BINARY +} + +trap 'killall cdc_kafka_consumer || true; stop_test $WORK_DIR' EXIT +run $* +check_logs $WORK_DIR +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"