From 3a1c206aca100496fbdb471da3843b207696cc5a Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Thu, 14 May 2026 18:02:42 +0800 Subject: [PATCH 1/4] fix error import related --- logservice/logpuller/priority_queue.go | 2 +- logservice/logpuller/region_request_worker_test.go | 2 +- maintainer/maintainer_controller_test.go | 6 +++--- pkg/redo/writer/blackhole/writer.go | 2 +- pkg/txnutil/gc/gc_manager_test.go | 10 +++++----- pkg/util/external_storage_test.go | 2 +- pkg/util/test_helper_test.go | 2 +- 7 files changed, 13 insertions(+), 13 deletions(-) diff --git a/logservice/logpuller/priority_queue.go b/logservice/logpuller/priority_queue.go index f19b6d34b9..c2838240f6 100644 --- a/logservice/logpuller/priority_queue.go +++ b/logservice/logpuller/priority_queue.go @@ -17,7 +17,7 @@ import ( "context" "sync" - "github.com/pingcap/errors" + "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/utils/heap" ) diff --git a/logservice/logpuller/region_request_worker_test.go b/logservice/logpuller/region_request_worker_test.go index f9752a0eb3..701af5ddc6 100644 --- a/logservice/logpuller/region_request_worker_test.go +++ b/logservice/logpuller/region_request_worker_test.go @@ -18,10 +18,10 @@ import ( "io" "testing" - "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/cdcpb" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/ticdc/logservice/logpuller/regionlock" + "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/utils/dynstream" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" diff --git a/maintainer/maintainer_controller_test.go b/maintainer/maintainer_controller_test.go index fe263c4a93..dbd18f7de8 100644 --- a/maintainer/maintainer_controller_test.go +++ b/maintainer/maintainer_controller_test.go @@ -29,7 +29,7 @@ import ( appcontext "github.com/pingcap/ticdc/pkg/common/context" commonEvent "github.com/pingcap/ticdc/pkg/common/event" "github.com/pingcap/ticdc/pkg/config" - cerrors "github.com/pingcap/ticdc/pkg/errors" + "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/eventservice" "github.com/pingcap/ticdc/pkg/node" "github.com/pingcap/ticdc/pkg/pdutil" @@ -1462,9 +1462,9 @@ func TestFinishBootstrapReturnsErrorWhenCheckpointMissing(t *testing.T) { }, false) require.Nil(t, postBootstrapRequest) require.Error(t, err) - code, ok := cerrors.RFCCode(err) + code, ok := errors.RFCCode(err) require.True(t, ok) - require.Equal(t, cerrors.ErrChangefeedInitTableTriggerDispatcherFailed.RFCCode(), code) + require.Equal(t, errors.ErrChangefeedInitTableTriggerDispatcherFailed.RFCCode(), code) require.Contains(t, err.Error(), "all bootstrap responses reported empty checkpointTs") require.False(t, controller.bootstrapped) } diff --git a/pkg/redo/writer/blackhole/writer.go b/pkg/redo/writer/blackhole/writer.go index ec276ac9f2..132ed47d3c 100644 --- a/pkg/redo/writer/blackhole/writer.go +++ b/pkg/redo/writer/blackhole/writer.go @@ -15,10 +15,10 @@ package blackhole import ( "context" - "errors" "github.com/pingcap/log" "github.com/pingcap/ticdc/pkg/common/event" + "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/redo/writer" "go.uber.org/zap" ) diff --git a/pkg/txnutil/gc/gc_manager_test.go b/pkg/txnutil/gc/gc_manager_test.go index 1b1761b827..5e22ceff87 100644 --- a/pkg/txnutil/gc/gc_manager_test.go +++ b/pkg/txnutil/gc/gc_manager_test.go @@ -25,7 +25,7 @@ import ( "github.com/pingcap/ticdc/pkg/common" appcontext "github.com/pingcap/ticdc/pkg/common/context" - cerrors "github.com/pingcap/ticdc/pkg/errors" + "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/pdutil" "github.com/stretchr/testify/require" ) @@ -48,9 +48,9 @@ func TestTryUpdateServiceGCSafepointReturnsErrorOnUpdateFailure(t *testing.T) { err := m.TryUpdateServiceGCSafepoint(ctx, common.Ts(100)) require.Error(t, err) - errCode, ok := cerrors.RFCCode(err) + errCode, ok := errors.RFCCode(err) require.True(t, ok) - require.Equal(t, cerrors.ErrUpdateServiceSafepointFailed.RFCCode(), errCode) + require.Equal(t, errors.ErrUpdateServiceSafepointFailed.RFCCode(), errCode) require.Equal(t, 1, updateCalls) } @@ -93,9 +93,9 @@ func TestTryUpdateServiceGCSafepointDoesNotReturnSnapshotLost(t *testing.T) { cfID := common.NewChangeFeedIDWithName("test-changefeed", "test") err := m.CheckStaleCheckpointTs(0, cfID, checkpointTs) require.Error(t, err) - errCode, ok := cerrors.RFCCode(err) + errCode, ok := errors.RFCCode(err) require.True(t, ok) - require.Equal(t, cerrors.ErrSnapshotLostByGC.RFCCode(), errCode) + require.Equal(t, errors.ErrSnapshotLostByGC.RFCCode(), errCode) } func TestTryDeleteServiceGCSafepointClearsCachedState(t *testing.T) { diff --git a/pkg/util/external_storage_test.go b/pkg/util/external_storage_test.go index d6483c45bc..0b0c199798 100644 --- a/pkg/util/external_storage_test.go +++ b/pkg/util/external_storage_test.go @@ -15,11 +15,11 @@ package util import ( "context" - "errors" "net/http" "testing" "time" + "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/tidb/br/pkg/storage" "github.com/stretchr/testify/require" ) diff --git a/pkg/util/test_helper_test.go b/pkg/util/test_helper_test.go index 75169afe32..778861ee65 100644 --- a/pkg/util/test_helper_test.go +++ b/pkg/util/test_helper_test.go @@ -15,11 +15,11 @@ package util import ( "context" - "errors" "sync/atomic" "testing" "time" + "github.com/pingcap/ticdc/pkg/errors" "github.com/stretchr/testify/require" ) From ca269c290351eac16470e78f473690489d6c8a1a Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Thu, 14 May 2026 18:25:10 +0800 Subject: [PATCH 2/4] fix error import related --- coordinator/changefeed/backoff.go | 7 ++- coordinator/changefeed/etcd_backend.go | 15 +++--- pkg/api/util.go | 17 +++--- pkg/common/event/dml_event_test.go | 2 +- pkg/etcd/client.go | 10 ++-- pkg/migrate/migrate.go | 25 +++++---- pkg/orchestrator/reactor_state.go | 15 +++--- pkg/orchestrator/reactor_state_tester.go | 7 ++- pkg/sink/kafka/mock_factory.go | 8 +-- .../mysql_writer_ddl_index_rewrite_test.go | 8 +-- pkg/sink/mysql/mysql_writer_dml_test.go | 54 +++++++++---------- 11 files changed, 78 insertions(+), 90 deletions(-) diff --git a/coordinator/changefeed/backoff.go b/coordinator/changefeed/backoff.go index 42fed64c68..1594309700 100644 --- a/coordinator/changefeed/backoff.go +++ b/coordinator/changefeed/backoff.go @@ -17,12 +17,11 @@ import ( "time" "github.com/cenkalti/backoff/v4" - "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/ticdc/heartbeatpb" "github.com/pingcap/ticdc/pkg/common" "github.com/pingcap/ticdc/pkg/config" - cerrors "github.com/pingcap/ticdc/pkg/errors" + "github.com/pingcap/ticdc/pkg/errors" "go.uber.org/atomic" "go.uber.org/zap" ) @@ -182,13 +181,13 @@ func (m *Backoff) StartFinished() { // ShouldFailChangefeed return true if a running error contains a changefeed not retry error. func ShouldFailChangefeed(e *heartbeatpb.RunningError) bool { - return cerrors.ShouldFailChangefeed(errors.New(e.Message + e.Code)) + return errors.ShouldFailChangefeed(errors.New(e.Message + e.Code)) } func (m *Backoff) HandleError(errs []*heartbeatpb.RunningError) (bool, *heartbeatpb.RunningError) { // if there are a fastFail error in errs, we can just fastFail the changefeed for _, err := range errs { - if cerrors.IsChangefeedGCFastFailErrorCode(errors.RFCErrorCode(err.Code)) || + if errors.IsChangefeedGCFastFailErrorCode(errors.RFCErrorCode(err.Code)) || ShouldFailChangefeed(err) { return true, err } diff --git a/coordinator/changefeed/etcd_backend.go b/coordinator/changefeed/etcd_backend.go index 6d9de399e3..ab7c98fafd 100644 --- a/coordinator/changefeed/etcd_backend.go +++ b/coordinator/changefeed/etcd_backend.go @@ -19,11 +19,10 @@ import ( "fmt" "time" - "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/ticdc/pkg/common" "github.com/pingcap/ticdc/pkg/config" - cerror "github.com/pingcap/ticdc/pkg/errors" + "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/etcd" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" @@ -154,7 +153,7 @@ func (b *EtcdBackend) CreateChangefeed(ctx context.Context, return errors.Trace(err) } if !resp.Succeeded { - err = cerror.ErrMetaOpFailed.GenWithStackByArgs(fmt.Sprintf("create changefeed %s", info.ChangefeedID.Name())) + err = errors.ErrMetaOpFailed.GenWithStackByArgs(fmt.Sprintf("create changefeed %s", info.ChangefeedID.Name())) return errors.Trace(err) } return nil @@ -186,7 +185,7 @@ func (b *EtcdBackend) UpdateChangefeed(ctx context.Context, info *config.ChangeF return errors.Trace(err) } if !putResp.Succeeded { - err = cerror.ErrMetaOpFailed.GenWithStackByArgs(fmt.Sprintf("update changefeed %s failed", info.ChangefeedID.Name())) + err = errors.ErrMetaOpFailed.GenWithStackByArgs(fmt.Sprintf("update changefeed %s failed", info.ChangefeedID.Name())) return errors.Trace(err) } return nil @@ -223,7 +222,7 @@ func (b *EtcdBackend) PauseChangefeed(ctx context.Context, id common.ChangeFeedI return errors.Trace(err) } if !putResp.Succeeded { - err = cerror.ErrMetaOpFailed.GenWithStackByArgs(fmt.Sprintf("pause changefeed %s failed", id.DisplayName)) + err = errors.ErrMetaOpFailed.GenWithStackByArgs(fmt.Sprintf("pause changefeed %s failed", id.DisplayName)) return errors.Trace(err) } return nil @@ -242,7 +241,7 @@ func (b *EtcdBackend) DeleteChangefeed(ctx context.Context, return errors.Trace(err) } if !resp.Succeeded { - err = cerror.ErrMetaOpFailed.GenWithStackByArgs(fmt.Sprintf("delete changefeed %s", changefeedID.Name())) + err = errors.ErrMetaOpFailed.GenWithStackByArgs(fmt.Sprintf("delete changefeed %s", changefeedID.Name())) return errors.Trace(err) } return nil @@ -284,7 +283,7 @@ func (b *EtcdBackend) ResumeChangefeed(ctx context.Context, return errors.Trace(err) } if !putResp.Succeeded { - err = cerror.ErrMetaOpFailed.GenWithStackByArgs(fmt.Sprintf("resume changefeed %s", info.ChangefeedID.Name())) + err = errors.ErrMetaOpFailed.GenWithStackByArgs(fmt.Sprintf("resume changefeed %s", info.ChangefeedID.Name())) return errors.Trace(err) } return nil @@ -336,7 +335,7 @@ func (b *EtcdBackend) SetChangefeedProgress(ctx context.Context, id common.Chang } } - err := cerror.ErrMetaOpFailed.GenWithStackByArgs(fmt.Sprintf("update changefeed to %s-%d", id.DisplayName, progress)) + err := errors.ErrMetaOpFailed.GenWithStackByArgs(fmt.Sprintf("update changefeed to %s-%d", id.DisplayName, progress)) return errors.Trace(err) } diff --git a/pkg/api/util.go b/pkg/api/util.go index 465b8a279d..5155a1cce7 100644 --- a/pkg/api/util.go +++ b/pkg/api/util.go @@ -19,9 +19,8 @@ import ( "strings" "time" - "github.com/pingcap/errors" "github.com/pingcap/log" - cerror "github.com/pingcap/ticdc/pkg/errors" + "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/util" "go.uber.org/zap" ) @@ -57,7 +56,7 @@ type HTTPError struct { // NewHTTPError wrap a err into HTTPError func NewHTTPError(err error) HTTPError { - errCode, _ := cerror.RFCCode(err) + errCode, _ := errors.RFCCode(err) return HTTPError{ Error: err.Error(), Code: string(errCode), @@ -66,11 +65,11 @@ func NewHTTPError(err error) HTTPError { // httpBadRequestError is some errors that will cause a BadRequestError in http handler var httpBadRequestError = []*errors.Error{ - cerror.ErrAPIInvalidParam, cerror.ErrSinkURIInvalid, cerror.ErrStartTsBeforeGC, - cerror.ErrChangeFeedNotExists, cerror.ErrTargetTsBeforeStartTs, cerror.ErrTableIneligible, - cerror.ErrFilterRuleInvalid, cerror.ErrChangefeedUpdateRefused, cerror.ErrMySQLConnectionError, - cerror.ErrMySQLInvalidConfig, cerror.ErrCaptureNotExist, cerror.ErrSchedulerRequestFailed, - cerror.ErrActiveActiveTSOIndexIncompatible, + errors.ErrAPIInvalidParam, errors.ErrSinkURIInvalid, errors.ErrStartTsBeforeGC, + errors.ErrChangeFeedNotExists, errors.ErrTargetTsBeforeStartTs, errors.ErrTableIneligible, + errors.ErrFilterRuleInvalid, errors.ErrChangefeedUpdateRefused, errors.ErrMySQLConnectionError, + errors.ErrMySQLInvalidConfig, errors.ErrCaptureNotExist, errors.ErrSchedulerRequestFailed, + errors.ErrActiveActiveTSOIndexIncompatible, } const ( @@ -107,7 +106,7 @@ func IsHTTPBadRequestError(err error) bool { return true } - rfcCode, ok := cerror.RFCCode(err) + rfcCode, ok := errors.RFCCode(err) if ok && e.RFCCode() == rfcCode { return true } diff --git a/pkg/common/event/dml_event_test.go b/pkg/common/event/dml_event_test.go index 9a0ed27e99..ce4b59f417 100644 --- a/pkg/common/event/dml_event_test.go +++ b/pkg/common/event/dml_event_test.go @@ -145,7 +145,7 @@ func TestBatchDMLEvent(t *testing.T) { // case 2: unsupported version batchDMLEvent.Version = 100 - data, err = batchDMLEvent.Marshal() + _, err = batchDMLEvent.Marshal() require.Error(t, err) require.Contains(t, err.Error(), "unsupported BatchDMLEvent version") } diff --git a/pkg/etcd/client.go b/pkg/etcd/client.go index c8fffb7afe..3f53c28791 100644 --- a/pkg/etcd/client.go +++ b/pkg/etcd/client.go @@ -21,10 +21,8 @@ import ( "time" "github.com/benbjohnson/clock" - "github.com/pingcap/errors" "github.com/pingcap/log" - cerror "github.com/pingcap/ticdc/pkg/errors" - pkgerror "github.com/pingcap/ticdc/pkg/errors" + "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/retry" "github.com/pingcap/ticdc/pkg/security" "github.com/pingcap/ticdc/pkg/util" @@ -344,7 +342,7 @@ func (c *ClientImpl) RequestProgress(ctx context.Context) error { func isRetryableError(rpcName string) retry.IsRetryable { return func(err error) bool { - if !cerror.IsRetryableError(err) { + if !errors.IsRetryableError(err) { return false } @@ -355,7 +353,7 @@ func isRetryableError(rpcName string) retry.IsRetryable { return false } case EtcdTxn: - return pkgerror.IsRetryableEtcdError(err) + return errors.IsRetryableEtcdError(err) default: // For other types of operation, we retry directly without handling errors } @@ -613,5 +611,5 @@ func IsHealthy(ctx context.Context, client *clientv3.Client) bool { _, err := client.Get(ctx, healthyPath) // permission denied is OK since proposal goes through consensus to get it // See: https://github.com/etcd-io/etcd/blob/85b640cee793e25f3837c47200089d14a8392dc7/etcdctl/ctlv3/command/ep_command.go#L124 - return err == nil || cerror.Is(err, v3rpc.ErrPermissionDenied) + return err == nil || errors.Is(err, v3rpc.ErrPermissionDenied) } diff --git a/pkg/migrate/migrate.go b/pkg/migrate/migrate.go index 87bfac66ea..bd738c426a 100644 --- a/pkg/migrate/migrate.go +++ b/pkg/migrate/migrate.go @@ -22,11 +22,10 @@ import ( "strings" "time" - "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/ticdc/pkg/common" "github.com/pingcap/ticdc/pkg/config" - cerror "github.com/pingcap/ticdc/pkg/errors" + "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/etcd" "github.com/pingcap/ticdc/pkg/pdutil" "github.com/pingcap/ticdc/pkg/security" @@ -175,7 +174,7 @@ func (m *migrator) migrate(ctx context.Context, etcdNoMetaVersion bool, oldVersi metaVersion, err := getMetaVersion(ctx, m.cli.GetEtcdClient(), m.cli.GetClusterID()) if err != nil { log.Error("get meta version failed, etcd meta data migration failed", zap.Error(err)) - return cerror.WrapError(cerror.ErrEtcdMigrateFailed, err) + return errors.WrapError(errors.ErrEtcdMigrateFailed, err) } if metaVersion > m.newMetaVersion { @@ -195,7 +194,7 @@ func (m *migrator) migrate(ctx context.Context, etcdNoMetaVersion bool, oldVersi _, err := m.cli.GetEtcdClient().Put(ctx, m.metaVersionKey, fmt.Sprintf("%d", oldVersion)) if err != nil { log.Error("put meta version failed, etcd meta data migration failed", zap.Error(err)) - return cerror.WrapError(cerror.ErrEtcdMigrateFailed, err) + return errors.WrapError(errors.ErrEtcdMigrateFailed, err) } } @@ -209,7 +208,7 @@ func (m *migrator) migrate(ctx context.Context, etcdNoMetaVersion bool, oldVersi } log.Error("campaign old owner failed, etcd meta data migration failed", zap.Error(err)) - return cerror.WrapError(cerror.ErrEtcdMigrateFailed, err) + return errors.WrapError(errors.ErrEtcdMigrateFailed, err) } beforeKV := make(map[string][]byte) @@ -219,7 +218,7 @@ func (m *migrator) migrate(ctx context.Context, etcdNoMetaVersion bool, oldVersi if err != nil { log.Error("get old meta data failed, etcd meta data migration failed", zap.Error(err)) - return cerror.WrapError(cerror.ErrEtcdMigrateFailed, err) + return errors.WrapError(errors.ErrEtcdMigrateFailed, err) } for _, v := range resp.Kvs { oldKey := string(v.Key) @@ -233,7 +232,7 @@ func (m *migrator) migrate(ctx context.Context, etcdNoMetaVersion bool, oldVersi log.Error("unmarshal changefeed failed", zap.String("value", string(v.Value)), zap.Error(err)) - return cerror.WrapError(cerror.ErrEtcdMigrateFailed, err) + return errors.WrapError(errors.ErrEtcdMigrateFailed, err) } info.UpstreamID = upstreamID info.ChangefeedID.DisplayName.Keyspace = common.DefaultKeyspaceName @@ -245,7 +244,7 @@ func (m *migrator) migrate(ctx context.Context, etcdNoMetaVersion bool, oldVersi if err != nil { log.Error("marshal changefeed failed", zap.Error(err)) - return cerror.WrapError(cerror.ErrEtcdMigrateFailed, err) + return errors.WrapError(errors.ErrEtcdMigrateFailed, err) } _, err = m.cli.GetEtcdClient().Put(ctx, newKey, str) } else { @@ -254,7 +253,7 @@ func (m *migrator) migrate(ctx context.Context, etcdNoMetaVersion bool, oldVersi if err != nil { log.Error("put new meta data failed, etcd meta data migration failed", zap.Error(err)) - return cerror.WrapError(cerror.ErrEtcdMigrateFailed, err) + return errors.WrapError(errors.ErrEtcdMigrateFailed, err) } } } @@ -263,21 +262,21 @@ func (m *migrator) migrate(ctx context.Context, etcdNoMetaVersion bool, oldVersi if err != nil { log.Error("save default upstream failed, "+ "etcd meta data migration failed", zap.Error(err)) - return cerror.WrapError(cerror.ErrEtcdMigrateFailed, err) + return errors.WrapError(errors.ErrEtcdMigrateFailed, err) } err = m.migrateGcServiceSafePoint(ctx, pdClient, m.config.Security, m.cli.GetGCServiceID(), m.config.GcTTL) if err != nil { log.Error("update meta version failed, etcd meta data migration failed", zap.Error(err)) - return cerror.WrapError(cerror.ErrEtcdMigrateFailed, err) + return errors.WrapError(errors.ErrEtcdMigrateFailed, err) } // 5. update metaVersion _, err = m.cli.GetEtcdClient().Put(ctx, m.metaVersionKey, fmt.Sprintf("%d", m.newMetaVersion)) if err != nil { log.Error("update meta version failed, etcd meta data migration failed", zap.Error(err)) - return cerror.WrapError(cerror.ErrEtcdMigrateFailed, err) + return errors.WrapError(errors.ErrEtcdMigrateFailed, err) } log.Info("etcd data migration successful") cleanOldData(ctx, m.cli.GetEtcdClient()) @@ -450,7 +449,7 @@ func (m *migrator) Migrate(ctx context.Context) error { log.Error("save default upstream failed, "+ "etcd meta data migration failed", zap.Error(err)) - return cerror.WrapError(cerror.ErrEtcdMigrateFailed, err) + return errors.WrapError(errors.ErrEtcdMigrateFailed, err) } _, err := m.cli.GetEtcdClient(). Put(ctx, m.metaVersionKey, fmt.Sprintf("%d", newVersion)) diff --git a/pkg/orchestrator/reactor_state.go b/pkg/orchestrator/reactor_state.go index a118278a1d..460be14657 100644 --- a/pkg/orchestrator/reactor_state.go +++ b/pkg/orchestrator/reactor_state.go @@ -18,11 +18,10 @@ import ( "time" "github.com/goccy/go-json" - "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/ticdc/pkg/common" "github.com/pingcap/ticdc/pkg/config" - cerrors "github.com/pingcap/ticdc/pkg/errors" + "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/etcd" "github.com/pingcap/ticdc/pkg/orchestrator/util" "go.uber.org/zap" @@ -111,7 +110,7 @@ func (s *GlobalReactorState) Update(key util.EtcdKey, value []byte, _ bool) erro var newCaptureInfo config.CaptureInfo err := newCaptureInfo.Unmarshal(value) if err != nil { - return cerrors.ErrUnmarshalFailed.Wrap(err).GenWithStackByArgs() + return errors.ErrUnmarshalFailed.Wrap(err).GenWithStackByArgs() } log.Info("remote capture online", zap.Any("info", newCaptureInfo), zap.String("role", s.Role)) @@ -151,7 +150,7 @@ func (s *GlobalReactorState) Update(key util.EtcdKey, value []byte, _ bool) erro var newUpstreamInfo config.UpstreamInfo err := newUpstreamInfo.Unmarshal(value) if err != nil { - return cerrors.ErrUnmarshalFailed.Wrap(err).GenWithStackByArgs() + return errors.ErrUnmarshalFailed.Wrap(err).GenWithStackByArgs() } log.Info("new upstream is add", zap.Uint64("upstream", k.UpstreamID), zap.Any("info", newUpstreamInfo), zap.String("role", s.Role)) @@ -521,7 +520,7 @@ func (s *ChangefeedReactorState) CheckCaptureAlive(captureID config.CaptureID) { // The key-value pair of capture info is written with lease, // so if the capture info is not exist, the lease is expired if len(v) == 0 { - return v, false, cerrors.ErrLeaseExpired.GenWithStackByArgs() + return v, false, errors.ErrLeaseExpired.GenWithStackByArgs() } return v, false, nil }, @@ -537,7 +536,7 @@ func (s *ChangefeedReactorState) CheckChangefeedNormal() { s.PatchInfo(func(info *config.ChangeFeedInfo) (*config.ChangeFeedInfo, bool, error) { if info == nil || info.AdminJobType.IsStopState() { s.skipPatchesInThisTick = true - return info, false, cerrors.ErrEtcdTryAgain.GenWithStackByArgs() + return info, false, errors.ErrEtcdTryAgain.GenWithStackByArgs() } return info, false, nil }) @@ -547,7 +546,7 @@ func (s *ChangefeedReactorState) CheckChangefeedNormal() { } if status.AdminJobType.IsStopState() { s.skipPatchesInThisTick = true - return status, false, cerrors.ErrEtcdTryAgain.GenWithStackByArgs() + return status, false, errors.ErrEtcdTryAgain.GenWithStackByArgs() } return status, false, nil }) @@ -613,7 +612,7 @@ func (s *ChangefeedReactorState) patchAny(key string, tpi interface{}, fn func(i Key: util.NewEtcdKey(key), Func: func(v []byte) ([]byte, bool, error) { if s.skipPatchesInThisTick { - return v, false, cerrors.ErrEtcdIgnore.GenWithStackByArgs() + return v, false, errors.ErrEtcdIgnore.GenWithStackByArgs() } var e interface{} if v != nil { diff --git a/pkg/orchestrator/reactor_state_tester.go b/pkg/orchestrator/reactor_state_tester.go index a59241d83d..f5b93dabfd 100644 --- a/pkg/orchestrator/reactor_state_tester.go +++ b/pkg/orchestrator/reactor_state_tester.go @@ -16,8 +16,7 @@ package orchestrator import ( "testing" - "github.com/pingcap/errors" - cerrors "github.com/pingcap/ticdc/pkg/errors" + "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/orchestrator/util" "github.com/stretchr/testify/require" ) @@ -82,9 +81,9 @@ RetryLoop: changedSet := make(map[util.EtcdKey]struct{}) for _, patch := range patches { err := patch.Patch(tmpKVEntries, changedSet) - if cerrors.ErrEtcdIgnore.Equal(errors.Cause(err)) { + if errors.ErrEtcdIgnore.Equal(errors.Cause(err)) { continue - } else if cerrors.ErrEtcdTryAgain.Equal(errors.Cause(err)) { + } else if errors.ErrEtcdTryAgain.Equal(errors.Cause(err)) { continue RetryLoop } else if err != nil { return errors.Trace(err) diff --git a/pkg/sink/kafka/mock_factory.go b/pkg/sink/kafka/mock_factory.go index ddc0362e61..a19f921421 100644 --- a/pkg/sink/kafka/mock_factory.go +++ b/pkg/sink/kafka/mock_factory.go @@ -107,9 +107,7 @@ func (m *MockSaramaSyncProducer) Close() { _ = m.SyncProducer.Close() } -func (m *MockSaramaSyncProducer) Heartbeat() { - return -} +func (m *MockSaramaSyncProducer) Heartbeat() {} // MockSaramaAsyncProducer is a mock implementation of AsyncProducer interface. type MockSaramaAsyncProducer struct { @@ -176,9 +174,7 @@ func (p *MockSaramaAsyncProducer) AsyncSend(ctx context.Context, topic string, p return nil } -func (p *MockSaramaAsyncProducer) Heartbeat() { - return -} +func (p *MockSaramaAsyncProducer) Heartbeat() {} // Close implement the AsyncProducer interface. func (p *MockSaramaAsyncProducer) Close() { diff --git a/pkg/sink/mysql/mysql_writer_ddl_index_rewrite_test.go b/pkg/sink/mysql/mysql_writer_ddl_index_rewrite_test.go index 01e5f63a65..f4fc860bd6 100644 --- a/pkg/sink/mysql/mysql_writer_ddl_index_rewrite_test.go +++ b/pkg/sink/mysql/mysql_writer_ddl_index_rewrite_test.go @@ -470,7 +470,7 @@ func TestCreateTableLikeKeepsAnonymousIndexNamesAfterDDLWaitCases(t *testing.T) true, timodel.ActionMultiSchemaChange, ) - sourceTableInfo = assertRestoreAnonymousIndexToNamedIndex( + _ = assertRestoreAnonymousIndexToNamedIndex( t, helper, "alter table t_anon_idx add column e int, add index (a)", @@ -486,7 +486,7 @@ func TestCreateTableLikeKeepsAnonymousIndexNamesAfterDDLWaitCases(t *testing.T) false, timodel.ActionAddIndex, ) - sourceTableInfo = assertRestoreAnonymousIndexToNamedIndex( + _ = assertRestoreAnonymousIndexToNamedIndex( t, helper, "alter table t_anon_idx add index (a)", @@ -494,7 +494,7 @@ func TestCreateTableLikeKeepsAnonymousIndexNamesAfterDDLWaitCases(t *testing.T) true, timodel.ActionAddIndex, ) - sourceTableInfo = assertRestoreAnonymousIndexToNamedIndex( + _ = assertRestoreAnonymousIndexToNamedIndex( t, helper, "alter table t_anon_idx add index (a)", @@ -502,7 +502,7 @@ func TestCreateTableLikeKeepsAnonymousIndexNamesAfterDDLWaitCases(t *testing.T) true, timodel.ActionAddIndex, ) - sourceTableInfo = assertRestoreAnonymousIndexToNamedIndex( + _ = assertRestoreAnonymousIndexToNamedIndex( t, helper, "alter table t_anon_idx add index (a)", diff --git a/pkg/sink/mysql/mysql_writer_dml_test.go b/pkg/sink/mysql/mysql_writer_dml_test.go index 8685fb064d..02ed4b3c14 100644 --- a/pkg/sink/mysql/mysql_writer_dml_test.go +++ b/pkg/sink/mysql/mysql_writer_dml_test.go @@ -278,7 +278,7 @@ func TestGenerateBatchSQL(t *testing.T) { writer.cfg.SafeMode = true writer.cfg.MaxTxnRow = 3 - sql, args, rowTypes = writer.generateBatchSQL([]*commonEvent.DMLEvent{dmlInsertEvent, dmlInsertEvent2, dmlInsertEvent3}) + sql, args, _ = writer.generateBatchSQL([]*commonEvent.DMLEvent{dmlInsertEvent, dmlInsertEvent2, dmlInsertEvent3}) require.Equal(t, 1, len(sql)) require.Equal(t, 1, len(args)) require.Equal(t, "REPLACE INTO `test`.`t` (`id`,`name`) VALUES (?,?),(?,?),(?,?)", sql[0]) @@ -313,7 +313,7 @@ func TestGenerateBatchSQL(t *testing.T) { // Measure execution time start := time.Now() - sql, args, rowTypes = writer.generateBatchSQL([]*commonEvent.DMLEvent{dmlEvent}) + sql, args, _ = writer.generateBatchSQL([]*commonEvent.DMLEvent{dmlEvent}) duration := time.Since(start) // Verify performance requirement @@ -361,7 +361,7 @@ func TestGenerateBatchSQLInUnSafeMode(t *testing.T) { // Delete A + Update A dmlDeleteEvent = helper.DML2DeleteEvent("test", "t", "insert into t values (2, 'test')", "delete from t where id = 2") dmlUpdateEvent, _ := helper.DML2UpdateEvent("test", "t", "insert into t values (2, 'test')", "update t set name = 'test2' where id = 2") - sql, args, rowTypes = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlDeleteEvent, dmlUpdateEvent}) + sql, args, _ = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlDeleteEvent, dmlUpdateEvent}) require.Equal(t, 2, len(sql)) require.Equal(t, 2, len(args)) require.Equal(t, 2, len(rowTypes)) @@ -372,7 +372,7 @@ func TestGenerateBatchSQLInUnSafeMode(t *testing.T) { // Insert A + Delete A dmlDeleteEvent = helper.DML2DeleteEvent("test", "t", "insert into t values (3, 'test')", "delete from t where id = 3") dmlInsertEvent = helper.DML2Event("test", "t", "insert into t values (3, 'test')") - sql, args, rowTypes = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlDeleteEvent}) + sql, args, _ = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlDeleteEvent}) require.Equal(t, 1, len(sql)) require.Equal(t, 1, len(args)) require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id`) IN ((?))", sql[0]) @@ -382,7 +382,7 @@ func TestGenerateBatchSQLInUnSafeMode(t *testing.T) { dmlInsertEvent = helper.DML2Event("test", "t", "insert into t values (4, 'test')") helper.ExecuteDeleteDml("test", "t", "delete from t where id = 4") dmlUpdateEvent, _ = helper.DML2UpdateEvent("test", "t", "insert into t values (4, 'test')", "update t set name = 'test4' where id = 4") - sql, args, rowTypes = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlUpdateEvent}) + sql, args, _ = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlUpdateEvent}) require.Equal(t, 1, len(sql)) require.Equal(t, 1, len(args)) require.Equal(t, "INSERT INTO `test`.`t` (`id`,`name`) VALUES (?,?)", sql[0]) @@ -391,7 +391,7 @@ func TestGenerateBatchSQLInUnSafeMode(t *testing.T) { // Update A + Delete A dmlDeleteEvent = helper.DML2DeleteEvent("test", "t", "insert into t values (5, 'test5')", "delete from t where id = 5") dmlUpdateEvent, _ = helper.DML2UpdateEvent("test", "t", "insert into t values (5, 'test')", "update t set name = 'test5' where id = 5") - sql, args, rowTypes = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlUpdateEvent, dmlDeleteEvent}) + sql, args, _ = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlUpdateEvent, dmlDeleteEvent}) require.Equal(t, 1, len(sql)) require.Equal(t, 1, len(args)) require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id`) IN ((?))", sql[0]) @@ -401,7 +401,7 @@ func TestGenerateBatchSQLInUnSafeMode(t *testing.T) { dmlUpdateEvent, _ = helper.DML2UpdateEvent("test", "t", "insert into t values (6, 'test')", "update t set name = 'test6' where id = 6") helper.ExecuteDeleteDml("test", "t", "delete from t where id = 6") dmlUpdateEvent2, _ := helper.DML2UpdateEvent("test", "t", "insert into t values (6, 'test6')", "update t set name = 'test7' where id = 6") - sql, args, rowTypes = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlUpdateEvent, dmlUpdateEvent2}) + sql, args, _ = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlUpdateEvent, dmlUpdateEvent2}) require.Equal(t, 1, len(sql)) require.Equal(t, 1, len(args)) require.Equal(t, "UPDATE `test`.`t` SET `id` = ?, `name` = ? WHERE `id` = ? LIMIT 1", sql[0]) @@ -412,7 +412,7 @@ func TestGenerateBatchSQLInUnSafeMode(t *testing.T) { dmlInsertEvent = helper.DML2Event("test", "t", "insert into t values (7, 'test')") helper.ExecuteDeleteDml("test", "t", "delete from t where id = 7") dmlInsertEvent2 := helper.DML2Event("test", "t", "insert into t values (7, 'test2')") - sql, args, rowTypes = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlDeleteEvent, dmlInsertEvent, dmlDeleteEvent, dmlInsertEvent2}) + sql, args, _ = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlDeleteEvent, dmlInsertEvent, dmlDeleteEvent, dmlInsertEvent2}) require.Equal(t, 2, len(sql)) require.Equal(t, 2, len(args)) require.Equal(t, 2, len(rowTypes)) @@ -428,7 +428,7 @@ func TestGenerateBatchSQLInUnSafeMode(t *testing.T) { dmlUpdateEvent, _ = helper.DML2UpdateEvent("test", "t", "insert into t values (8, 'test')", "update t set name = 'test8' where id = 8") helper.ExecuteDeleteDml("test", "t", "delete from t where id = 8") dmlDeleteEvent2 := helper.DML2DeleteEvent("test", "t", "insert into t values (8, 'test8')", "delete from t where id = 8") - sql, args, rowTypes = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlDeleteEvent, dmlInsertEvent, dmlUpdateEvent, dmlDeleteEvent2}) + sql, args, _ = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlDeleteEvent, dmlInsertEvent, dmlUpdateEvent, dmlDeleteEvent2}) require.Equal(t, 1, len(sql)) require.Equal(t, 1, len(args)) require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id`) IN ((?),(?))", sql[0]) @@ -443,7 +443,7 @@ func TestGenerateBatchSQLInUnSafeMode(t *testing.T) { dmlUpdateEvent2, _ = helper.DML2UpdateEvent("test", "t", "insert into t values (9, 'test9')", "update t set name = 'test10' where id = 9") helper.ExecuteDeleteDml("test", "t", "delete from t where id = 9") dmlDeleteEvent2 = helper.DML2DeleteEvent("test", "t", "insert into t values (9, 'test10')", "delete from t where id = 9") - sql, args, rowTypes = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlDeleteEvent, dmlInsertEvent, dmlUpdateEvent, dmlUpdateEvent2, dmlDeleteEvent2}) + sql, args, _ = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlDeleteEvent, dmlInsertEvent, dmlUpdateEvent, dmlUpdateEvent2, dmlDeleteEvent2}) require.Equal(t, 1, len(sql)) require.Equal(t, 1, len(args)) require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id`) IN ((?),(?))", sql[0]) @@ -454,7 +454,7 @@ func TestGenerateBatchSQLInUnSafeMode(t *testing.T) { helper.ExecuteDeleteDml("test", "t", "delete from t where id = 10") dmlDeleteEvent = helper.DML2DeleteEvent("test", "t", "insert into t values (10, 'test')", "delete from t where id = 10") dmlInsertEvent2 = helper.DML2Event("test", "t", "insert into t values (10, 'test2')") - sql, args, rowTypes = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlDeleteEvent, dmlInsertEvent2}) + sql, args, _ = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlDeleteEvent, dmlInsertEvent2}) require.Equal(t, 2, len(sql)) require.Equal(t, 2, len(args)) require.Equal(t, 2, len(rowTypes)) @@ -469,7 +469,7 @@ func TestGenerateBatchSQLInUnSafeMode(t *testing.T) { dmlUpdateEvent, _ = helper.DML2UpdateEvent("test", "t", "insert into t values (11, 'test')", "update t set name = 'test11' where id = 11") helper.ExecuteDeleteDml("test", "t", "delete from t where id = 11") dmlDeleteEvent = helper.DML2DeleteEvent("test", "t", "insert into t values (11, 'test11')", "delete from t where id = 11") - sql, args, rowTypes = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlUpdateEvent, dmlDeleteEvent}) + sql, args, _ = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlUpdateEvent, dmlDeleteEvent}) require.Equal(t, 1, len(sql)) require.Equal(t, 1, len(args)) require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id`) IN ((?))", sql[0]) @@ -481,7 +481,7 @@ func TestGenerateBatchSQLInUnSafeMode(t *testing.T) { dmlUpdateEvent, _ = helper.DML2UpdateEvent("test", "t", "insert into t values (12, 'test')", "update t set name = 'test12' where id = 12") helper.ExecuteDeleteDml("test", "t", "delete from t where id = 12") dmlUpdateEvent2, _ = helper.DML2UpdateEvent("test", "t", "insert into t values (12, 'test12')", "update t set name = 'test13' where id = 12") - sql, args, rowTypes = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlUpdateEvent, dmlUpdateEvent2}) + sql, args, _ = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlUpdateEvent, dmlUpdateEvent2}) require.Equal(t, 1, len(sql)) require.Equal(t, 1, len(args)) require.Equal(t, "INSERT INTO `test`.`t` (`id`,`name`) VALUES (?,?)", sql[0]) @@ -492,7 +492,7 @@ func TestGenerateBatchSQLInUnSafeMode(t *testing.T) { helper.ExecuteDeleteDml("test", "t", "delete from t where id = 13") dmlDeleteEvent = helper.DML2DeleteEvent("test", "t", "insert into t values (14, 'test')", "delete from t where id = 14") dmlUpdateEvent, _ = helper.DML2UpdateEvent("test", "t", "insert into t values (15, 'test')", "update t set name = 'test15' where id = 15") - sql, args, rowTypes = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlDeleteEvent, dmlUpdateEvent}) + sql, args, _ = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlDeleteEvent, dmlUpdateEvent}) require.Equal(t, 3, len(sql)) require.Equal(t, 3, len(args)) require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id`) IN ((?))", sql[0]) @@ -508,7 +508,7 @@ func TestGenerateBatchSQLInUnSafeMode(t *testing.T) { dmlInsertEvent2 = helper.DML2Event("test", "t", "insert into t values (17, 'test')") helper.ExecuteDeleteDml("test", "t", "delete from t where id = 17") dmlInsertEvent3 := helper.DML2Event("test", "t", "insert into t values (18, 'test')") - sql, args, rowTypes = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlInsertEvent2, dmlInsertEvent3}) + sql, args, _ = writer.generateBatchSQLInUnSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlInsertEvent2, dmlInsertEvent3}) require.Equal(t, 1, len(sql)) require.Equal(t, 1, len(args)) require.Equal(t, "INSERT INTO `test`.`t` (`id`,`name`) VALUES (?,?),(?,?),(?,?)", sql[0]) @@ -538,7 +538,7 @@ func TestGenerateBatchSQLInSafeMode(t *testing.T) { // Insert A + Delete A dmlDeleteEvent = helper.DML2DeleteEvent("test", "t", "insert into t values (3, 'test')", "delete from t where id = 3") dmlInsertEvent = helper.DML2Event("test", "t", "insert into t values (3, 'test')") - sql, args, rowTypes = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlDeleteEvent}) + sql, args, _ = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlDeleteEvent}) require.Equal(t, 1, len(sql)) require.Equal(t, 1, len(args)) require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id`) IN ((?))", sql[0]) @@ -548,7 +548,7 @@ func TestGenerateBatchSQLInSafeMode(t *testing.T) { dmlInsertEvent = helper.DML2Event("test", "t", "insert into t values (4, 'test')") helper.ExecuteDeleteDml("test", "t", "delete from t where id = 4") dmlUpdateEvent, _ := helper.DML2UpdateEvent("test", "t", "insert into t values (4, 'test')", "update t set name = 'test4' where id = 4") - sql, args, rowTypes = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlUpdateEvent}) + sql, args, _ = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlUpdateEvent}) require.Equal(t, 1, len(sql)) require.Equal(t, 1, len(args)) require.Equal(t, "REPLACE INTO `test`.`t` (`id`,`name`) VALUES (?,?)", sql[0]) @@ -557,7 +557,7 @@ func TestGenerateBatchSQLInSafeMode(t *testing.T) { // Update A + Delete A dmlDeleteEvent = helper.DML2DeleteEvent("test", "t", "insert into t values (5, 'test5')", "delete from t where id = 5") dmlUpdateEvent, _ = helper.DML2UpdateEvent("test", "t", "insert into t values (5, 'test')", "update t set name = 'test5' where id = 5") - sql, args, rowTypes = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlUpdateEvent, dmlDeleteEvent}) + sql, args, _ = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlUpdateEvent, dmlDeleteEvent}) require.Equal(t, 1, len(sql)) require.Equal(t, 1, len(args)) require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id`) IN ((?))", sql[0]) @@ -567,7 +567,7 @@ func TestGenerateBatchSQLInSafeMode(t *testing.T) { dmlUpdateEvent, _ = helper.DML2UpdateEvent("test", "t", "insert into t values (6, 'test')", "update t set name = 'test6' where id = 6") helper.ExecuteDeleteDml("test", "t", "delete from t where id = 6") dmlUpdateEvent2, _ := helper.DML2UpdateEvent("test", "t", "insert into t values (6, 'test6')", "update t set name = 'test7' where id = 6") - sql, args, rowTypes = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlUpdateEvent, dmlUpdateEvent2}) + sql, args, _ = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlUpdateEvent, dmlUpdateEvent2}) require.Equal(t, 1, len(sql)) require.Equal(t, 1, len(args)) require.Equal(t, "REPLACE INTO `test`.`t` (`id`,`name`) VALUES (?,?)", sql[0]) @@ -578,7 +578,7 @@ func TestGenerateBatchSQLInSafeMode(t *testing.T) { dmlInsertEvent = helper.DML2Event("test", "t", "insert into t values (7, 'test')") helper.ExecuteDeleteDml("test", "t", "delete from t where id = 7") dmlInsertEvent2 := helper.DML2Event("test", "t", "insert into t values (7, 'test2')") - sql, args, rowTypes = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlDeleteEvent, dmlInsertEvent, dmlDeleteEvent, dmlInsertEvent2}) + sql, args, _ = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlDeleteEvent, dmlInsertEvent, dmlDeleteEvent, dmlInsertEvent2}) require.Equal(t, 1, len(sql)) require.Equal(t, 1, len(args)) require.Equal(t, "REPLACE INTO `test`.`t` (`id`,`name`) VALUES (?,?)", sql[0]) @@ -591,7 +591,7 @@ func TestGenerateBatchSQLInSafeMode(t *testing.T) { dmlUpdateEvent, _ = helper.DML2UpdateEvent("test", "t", "insert into t values (8, 'test')", "update t set name = 'test8' where id = 8") helper.ExecuteDeleteDml("test", "t", "delete from t where id = 8") dmlDeleteEvent2 := helper.DML2DeleteEvent("test", "t", "insert into t values (8, 'test8')", "delete from t where id = 8") - sql, args, rowTypes = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlDeleteEvent, dmlInsertEvent, dmlUpdateEvent, dmlDeleteEvent2}) + sql, args, _ = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlDeleteEvent, dmlInsertEvent, dmlUpdateEvent, dmlDeleteEvent2}) require.Equal(t, 1, len(sql)) require.Equal(t, 1, len(args)) require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id`) IN ((?))", sql[0]) @@ -606,7 +606,7 @@ func TestGenerateBatchSQLInSafeMode(t *testing.T) { dmlUpdateEvent2, _ = helper.DML2UpdateEvent("test", "t", "insert into t values (9, 'test9')", "update t set name = 'test10' where id = 9") helper.ExecuteDeleteDml("test", "t", "delete from t where id = 9") dmlDeleteEvent2 = helper.DML2DeleteEvent("test", "t", "insert into t values (9, 'test10')", "delete from t where id = 9") - sql, args, rowTypes = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlDeleteEvent, dmlInsertEvent, dmlUpdateEvent, dmlUpdateEvent2, dmlDeleteEvent2}) + sql, args, _ = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlDeleteEvent, dmlInsertEvent, dmlUpdateEvent, dmlUpdateEvent2, dmlDeleteEvent2}) require.Equal(t, 1, len(sql)) require.Equal(t, 1, len(args)) require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id`) IN ((?))", sql[0]) @@ -617,7 +617,7 @@ func TestGenerateBatchSQLInSafeMode(t *testing.T) { helper.ExecuteDeleteDml("test", "t", "delete from t where id = 10") dmlDeleteEvent = helper.DML2DeleteEvent("test", "t", "insert into t values (10, 'test')", "delete from t where id = 10") dmlInsertEvent2 = helper.DML2Event("test", "t", "insert into t values (10, 'test2')") - sql, args, rowTypes = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlDeleteEvent, dmlInsertEvent2}) + sql, args, _ = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlDeleteEvent, dmlInsertEvent2}) require.Equal(t, 1, len(sql)) require.Equal(t, 1, len(args)) require.Equal(t, "REPLACE INTO `test`.`t` (`id`,`name`) VALUES (?,?)", sql[0]) @@ -629,7 +629,7 @@ func TestGenerateBatchSQLInSafeMode(t *testing.T) { dmlUpdateEvent, _ = helper.DML2UpdateEvent("test", "t", "insert into t values (11, 'test')", "update t set name = 'test11' where id = 11") helper.ExecuteDeleteDml("test", "t", "delete from t where id = 11") dmlDeleteEvent = helper.DML2DeleteEvent("test", "t", "insert into t values (11, 'test11')", "delete from t where id = 11") - sql, args, rowTypes = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlUpdateEvent, dmlDeleteEvent}) + sql, args, _ = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlUpdateEvent, dmlDeleteEvent}) require.Equal(t, 1, len(sql)) require.Equal(t, 1, len(args)) require.Equal(t, "DELETE FROM `test`.`t` WHERE (`id`) IN ((?))", sql[0]) @@ -641,7 +641,7 @@ func TestGenerateBatchSQLInSafeMode(t *testing.T) { dmlUpdateEvent, _ = helper.DML2UpdateEvent("test", "t", "insert into t values (12, 'test')", "update t set name = 'test12' where id = 12") helper.ExecuteDeleteDml("test", "t", "delete from t where id = 12") dmlUpdateEvent2, _ = helper.DML2UpdateEvent("test", "t", "insert into t values (12, 'test12')", "update t set name = 'test13' where id = 12") - sql, args, rowTypes = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlUpdateEvent, dmlUpdateEvent2}) + sql, args, _ = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlUpdateEvent, dmlUpdateEvent2}) require.Equal(t, 1, len(sql)) require.Equal(t, 1, len(args)) require.Equal(t, "REPLACE INTO `test`.`t` (`id`,`name`) VALUES (?,?)", sql[0]) @@ -652,7 +652,7 @@ func TestGenerateBatchSQLInSafeMode(t *testing.T) { helper.ExecuteDeleteDml("test", "t", "delete from t where id = 13") dmlDeleteEvent = helper.DML2DeleteEvent("test", "t", "insert into t values (14, 'test')", "delete from t where id = 14") dmlUpdateEvent, _ = helper.DML2UpdateEvent("test", "t", "insert into t values (15, 'test')", "update t set name = 'test15' where id = 15") - sql, args, rowTypes = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlDeleteEvent, dmlUpdateEvent}) + sql, args, _ = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlDeleteEvent, dmlUpdateEvent}) require.Equal(t, 2, len(sql)) require.Equal(t, 2, len(args)) require.Equal(t, 2, len(rowTypes)) @@ -673,7 +673,7 @@ func TestGenerateBatchSQLInSafeMode(t *testing.T) { dmlInsertEvent2 = helper.DML2Event("test", "t", "insert into t values (17, 'test')") helper.ExecuteDeleteDml("test", "t", "delete from t where id = 17") dmlInsertEvent3 := helper.DML2Event("test", "t", "insert into t values (18, 'test')") - sql, args, rowTypes = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlInsertEvent2, dmlInsertEvent3}) + sql, args, _ = writer.generateBatchSQLInSafeMode([]*commonEvent.DMLEvent{dmlInsertEvent, dmlInsertEvent2, dmlInsertEvent3}) require.Equal(t, 1, len(sql)) require.Equal(t, 1, len(args)) require.Equal(t, "REPLACE INTO `test`.`t` (`id`,`name`) VALUES (?,?),(?,?),(?,?)", sql[0]) From 7d28f1e9f57cd4e279a11c14a01a336384fe38ff Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Thu, 14 May 2026 22:16:12 +0800 Subject: [PATCH 3/4] fix more code --- cmd/kafka-consumer/writer_test.go | 3 +-- logservice/logpuller/region_request_worker.go | 2 +- logservice/schemastore/multi_version_test_utils.go | 6 ------ maintainer/maintainer_controller_test.go | 4 ---- .../replica/default_span_split_checker_test.go | 8 ++------ maintainer/replica/split_span_checker_test.go | 4 +--- pkg/applier/redo.go | 2 +- pkg/applier/redo_test.go | 7 +++---- pkg/applier/splitter.go | 8 +++----- pkg/redo/codec/codec_test.go | 1 + pkg/sink/codec/builder.go | 3 +-- pkg/sink/codec/open/codec_test.go | 12 ++++++------ pkg/sink/codec/open/encoder_test.go | 1 + .../mysql/mysql_writer_dml_active_active_test.go | 4 ++-- pkg/tcpserver/tcp_server_test.go | 10 +++++----- pkg/upstream/upstream.go | 3 +-- server/module_election.go | 6 +++--- tests/integration_tests/default_value/main.go | 8 ++++---- tests/integration_tests/multi_source/main.go | 8 ++++---- tests/integration_tests/util/db.go | 4 ++-- 20 files changed, 42 insertions(+), 62 deletions(-) diff --git a/cmd/kafka-consumer/writer_test.go b/cmd/kafka-consumer/writer_test.go index ec9aa34a43..e4322523c6 100644 --- a/cmd/kafka-consumer/writer_test.go +++ b/cmd/kafka-consumer/writer_test.go @@ -311,14 +311,13 @@ func TestAppendRow2Group_DoesNotDropCommitTsFallbackBeforeApplied(t *testing.T) group := progress.eventsGroup[1] require.NotNil(t, group) - resolvedEvents := make([]*commonEvent.DMLEvent, 0) // Expect: commitTs=100 is still kept and can be resolved. resolved := group.ResolveInto(150, nil) require.Len(t, resolved, 1) require.Equal(t, uint64(100), resolved[0].CommitTs) // Step 3: once downstream has flushed beyond commitTs=100, the replay is safe to ignore. - resolvedEvents = make([]*commonEvent.DMLEvent, 0) + resolvedEvents := make([]*commonEvent.DMLEvent, 0) group.AppliedWatermark = 200 w.appendRow2Group(newDMLEvent(1, 100), progress, kafka.Offset(12)) resolved = group.ResolveInto(150, resolvedEvents) diff --git a/logservice/logpuller/region_request_worker.go b/logservice/logpuller/region_request_worker.go index 142f4c6493..a9ccc09678 100644 --- a/logservice/logpuller/region_request_worker.go +++ b/logservice/logpuller/region_request_worker.go @@ -105,7 +105,7 @@ func newRegionRequestWorker( } var regionErr error if err := version.CheckStoreVersion(ctx, worker.client.pd); err != nil { - if errors.Cause(err) == context.Canceled { + if cerror.Is(errors.Cause(err), context.Canceled) { return nil } log.Error("event feed check store version fails", diff --git a/logservice/schemastore/multi_version_test_utils.go b/logservice/schemastore/multi_version_test_utils.go index b066e16478..02c2d4bf58 100644 --- a/logservice/schemastore/multi_version_test_utils.go +++ b/logservice/schemastore/multi_version_test_utils.go @@ -61,12 +61,6 @@ func buildRecoverTableEventForTest(schemaID, tableID int64, schemaName, tableNam } func buildCreatePartitionTableEventForTest(schemaID, tableID int64, schemaName, tableName string, partitionIDs []int64, finishedTs uint64) *PersistedDDLEvent { - partitionDefinitions := make([]model.PartitionDefinition, 0, len(partitionIDs)) - for _, partitionID := range partitionIDs { - partitionDefinitions = append(partitionDefinitions, model.PartitionDefinition{ - ID: partitionID, - }) - } return &PersistedDDLEvent{ Type: byte(model.ActionCreateTable), SchemaID: schemaID, diff --git a/maintainer/maintainer_controller_test.go b/maintainer/maintainer_controller_test.go index dbd18f7de8..40847d323a 100644 --- a/maintainer/maintainer_controller_test.go +++ b/maintainer/maintainer_controller_test.go @@ -1804,8 +1804,6 @@ func TestLargeTableInitialization(t *testing.T) { require.Equal(t, 10, controller.spanController.GetAbsentSize()) - spanIDList := []common.DispatcherID{} - controller.schedulerController.GetScheduler(scheduler.BalanceScheduler).Execute() controller.schedulerController.GetScheduler(scheduler.BalanceSplitScheduler).Execute() require.Equal(t, 0, controller.operatorController.OperatorSize()) @@ -1818,7 +1816,6 @@ func TestLargeTableInitialization(t *testing.T) { for _, op := range controller.operatorController.GetAllOperators() { require.Equal(t, "add", op.Type()) - spanIDList = append(spanIDList, op.ID()) op.Start() op.PostFinish() controller.operatorController.RemoveOp(op.ID()) @@ -1837,7 +1834,6 @@ func TestLargeTableInitialization(t *testing.T) { require.Equal(t, 4, controller.operatorController.OperatorSize()) for _, op := range controller.operatorController.GetAllOperators() { require.Equal(t, "add", op.Type()) - spanIDList = append(spanIDList, op.ID()) op.Start() op.PostFinish() controller.operatorController.RemoveOp(op.ID()) diff --git a/maintainer/replica/default_span_split_checker_test.go b/maintainer/replica/default_span_split_checker_test.go index c7748a693f..b01e853c82 100644 --- a/maintainer/replica/default_span_split_checker_test.go +++ b/maintainer/replica/default_span_split_checker_test.go @@ -182,8 +182,6 @@ func TestDefaultSpanSplitChecker_UpdateStatus_RegionCheck(t *testing.T) { checker.AddReplica(replica) - spanStatus := checker.allTasks[replica.ID] - status := &heartbeatpb.TableSpanStatus{ ID: replica.ID.ToPB(), ComponentStatus: heartbeatpb.ComponentState_Working, @@ -194,7 +192,7 @@ func TestDefaultSpanSplitChecker_UpdateStatus_RegionCheck(t *testing.T) { // Test region count update checker.UpdateStatus(replica) - spanStatus = checker.allTasks[replica.ID] + spanStatus := checker.allTasks[replica.ID] require.Equal(t, 6, spanStatus.regionCount) require.Contains(t, checker.splitReadyTasks, replica.ID) } @@ -227,10 +225,8 @@ func TestDefaultSpanSplitChecker_UpdateStatus_RegionCheckError(t *testing.T) { } replica.UpdateStatus(status) - spanStatus := checker.allTasks[replica.ID] - // Test region check error handling - spanStatus = checker.allTasks[replica.ID] + spanStatus := checker.allTasks[replica.ID] require.Equal(t, 0, spanStatus.regionCount) // Should remain 0 due to error } diff --git a/maintainer/replica/split_span_checker_test.go b/maintainer/replica/split_span_checker_test.go index 65d1b5ddde..7f5d29912c 100644 --- a/maintainer/replica/split_span_checker_test.go +++ b/maintainer/replica/split_span_checker_test.go @@ -305,8 +305,6 @@ func TestSplitSpanChecker_UpdateStatus_Region(t *testing.T) { replica := replicas[0] checker.AddReplica(replica) - spanStatus := checker.allTasks[replica.ID] - status := &heartbeatpb.TableSpanStatus{ ID: replica.ID.ToPB(), ComponentStatus: heartbeatpb.ComponentState_Working, @@ -318,7 +316,7 @@ func TestSplitSpanChecker_UpdateStatus_Region(t *testing.T) { checker.UpdateStatus(replica) // Test region count above threshold - spanStatus = checker.allTasks[replica.ID] + spanStatus := checker.allTasks[replica.ID] require.Equal(t, 6, spanStatus.regionCount) // Test region check interval enforcement diff --git a/pkg/applier/redo.go b/pkg/applier/redo.go index ad07777556..f3b7d78741 100644 --- a/pkg/applier/redo.go +++ b/pkg/applier/redo.go @@ -480,7 +480,7 @@ func (ra *RedoApplier) Apply(egCtx context.Context) (err error) { }) err = eg.Wait() - if errors.Cause(err) != errApplyFinished { + if !errors.Is(errors.Cause(err), errApplyFinished) { return err } return nil diff --git a/pkg/applier/redo_test.go b/pkg/applier/redo_test.go index 13a7b0cec5..a5e47784cf 100644 --- a/pkg/applier/redo_test.go +++ b/pkg/applier/redo_test.go @@ -26,7 +26,6 @@ import ( "github.com/phayes/freeport" dmysql "github.com/pingcap/ticdc/downstreamadapter/sink/mysql" "github.com/pingcap/ticdc/pkg/common" - commonType "github.com/pingcap/ticdc/pkg/common" commonEvent "github.com/pingcap/ticdc/pkg/common/event" "github.com/pingcap/ticdc/pkg/config" misc "github.com/pingcap/ticdc/pkg/redo/common" @@ -94,8 +93,8 @@ func (br *MockReader) ReadMeta(ctx context.Context) (checkpointTs, resolvedTs ui } // GetChangefeedID implements LogReader.GetChangefeedID -func (br *MockReader) GetChangefeedID() commonType.ChangeFeedID { - return commonType.ChangeFeedID{} +func (br *MockReader) GetChangefeedID() common.ChangeFeedID { + return common.ChangeFeedID{} } // GetVersion implements LogReader.GetVersion @@ -104,7 +103,7 @@ func (br *MockReader) GetVersion() int { } func newFlag(flag uint) uint64 { - var result commonType.ColumnFlagType + var result common.ColumnFlagType if flag == pmysql.PriKeyFlag { result.SetIsHandleKey() result.SetIsPrimaryKey() diff --git a/pkg/applier/splitter.go b/pkg/applier/splitter.go index 1793ac4e69..674d2d3e1a 100644 --- a/pkg/applier/splitter.go +++ b/pkg/applier/splitter.go @@ -18,9 +18,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/ticdc/pkg/common" - commonType "github.com/pingcap/ticdc/pkg/common" "github.com/pingcap/ticdc/pkg/common/event" - commonEvent "github.com/pingcap/ticdc/pkg/common/event" "github.com/pingcap/ticdc/pkg/redo/reader" "go.uber.org/zap" ) @@ -125,10 +123,10 @@ func (u *updateEventSplitter) readNextRow(ctx context.Context) (*event.RedoDMLEv // processEvent return (event to emit, pending event) func processEvent( - event *commonEvent.RedoDMLEvent, - prevTxnStartTs commonType.Ts, + event *event.RedoDMLEvent, + prevTxnStartTs common.Ts, tempStorage *tempTxnInsertEventStorage, -) (*commonEvent.RedoDMLEvent, *commonEvent.RedoDMLEvent, error) { +) (*event.RedoDMLEvent, *event.RedoDMLEvent, error) { if event == nil { log.Panic("event should not be nil") } diff --git a/pkg/redo/codec/codec_test.go b/pkg/redo/codec/codec_test.go index 7418ebc2c0..4322e65197 100644 --- a/pkg/redo/codec/codec_test.go +++ b/pkg/redo/codec/codec_test.go @@ -44,5 +44,6 @@ func TestDDLRedoConvert(t *testing.T) { require.Zero(t, len(data)) data2, err := MarshalRedoLog(redoLog2, nil) + require.Nil(t, err) require.Equal(t, data1, data2) } diff --git a/pkg/sink/codec/builder.go b/pkg/sink/codec/builder.go index 8a17146921..d0f44b5342 100644 --- a/pkg/sink/codec/builder.go +++ b/pkg/sink/codec/builder.go @@ -20,7 +20,6 @@ import ( "github.com/pingcap/log" "github.com/pingcap/ticdc/pkg/config" "github.com/pingcap/ticdc/pkg/errors" - cerror "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/sink/codec/avro" "github.com/pingcap/ticdc/pkg/sink/codec/canal" "github.com/pingcap/ticdc/pkg/sink/codec/common" @@ -60,7 +59,7 @@ func NewEventDecoder( case config.ProtocolAvro: schemaM, err := avro.NewConfluentSchemaManager(ctx, codecConfig.AvroConfluentSchemaRegistry, nil) if err != nil { - return nil, cerror.Trace(err) + return nil, errors.Trace(err) } return avro.NewDecoder(codecConfig, idx, schemaM, topic, upstreamTiDB), nil case config.ProtocolSimple: diff --git a/pkg/sink/codec/open/codec_test.go b/pkg/sink/codec/open/codec_test.go index c677caeda6..2764c7fd4c 100644 --- a/pkg/sink/codec/open/codec_test.go +++ b/pkg/sink/codec/open/codec_test.go @@ -105,7 +105,7 @@ func TestRowChanged2MsgOnlyHandleKeyColumns(t *testing.T) { insertEventNoHandleKey.Rewind() require.True(t, ok) require.NotNil(t, row) - key, value, _, err = encodeRowChangedEvent(&commonEvent.RowEvent{ + _, _, _, err = encodeRowChangedEvent(&commonEvent.RowEvent{ TableInfo: insertEventNoHandleKey.TableInfo, Event: row, CommitTs: insertEventNoHandleKey.CommitTs, @@ -119,7 +119,7 @@ func TestRowChanged2MsgOnlyHandleKeyColumns(t *testing.T) { require.NotNil(t, row) row.PreRow = row.Row config.DeleteOnlyHandleKeyColumns = true - key, value, _, err = encodeRowChangedEvent(&commonEvent.RowEvent{ + _, value, _, err = encodeRowChangedEvent(&commonEvent.RowEvent{ TableInfo: insertEvent.TableInfo, Event: row, CommitTs: insertEvent.CommitTs, @@ -145,7 +145,7 @@ func TestRowChanged2MsgOnlyHandleKeyColumns(t *testing.T) { require.True(t, ok) require.NotNil(t, row) row.PreRow = row.Row - key, value, _, err = encodeRowChangedEvent(&commonEvent.RowEvent{ + _, _, _, err = encodeRowChangedEvent(&commonEvent.RowEvent{ TableInfo: insertEventNoHandleKey.TableInfo, Event: row, CommitTs: insertEventNoHandleKey.CommitTs, @@ -160,7 +160,7 @@ func TestRowChanged2MsgOnlyHandleKeyColumns(t *testing.T) { row.PreRow = row.Row row.Row = chunk.Row{} config.DeleteOnlyHandleKeyColumns = true - key, value, _, err = encodeRowChangedEvent(&commonEvent.RowEvent{ + _, value, _, err = encodeRowChangedEvent(&commonEvent.RowEvent{ TableInfo: insertEvent.TableInfo, Event: row, CommitTs: insertEvent.CommitTs, @@ -171,7 +171,7 @@ func TestRowChanged2MsgOnlyHandleKeyColumns(t *testing.T) { require.NotContains(t, string(value), "a") config.DeleteOnlyHandleKeyColumns = false - key, value, _, err = encodeRowChangedEvent(&commonEvent.RowEvent{ + _, value, _, err = encodeRowChangedEvent(&commonEvent.RowEvent{ TableInfo: insertEvent.TableInfo, Event: row, CommitTs: insertEvent.CommitTs, @@ -199,7 +199,7 @@ func TestRowChanged2MsgOnlyHandleKeyColumns(t *testing.T) { require.NotNil(t, row) row.PreRow = row.Row row.Row = chunk.Row{} - key, value, _, err = encodeRowChangedEvent(&commonEvent.RowEvent{ + _, _, _, err = encodeRowChangedEvent(&commonEvent.RowEvent{ TableInfo: insertEventNoHandleKey.TableInfo, Event: row, CommitTs: insertEventNoHandleKey.CommitTs, diff --git a/pkg/sink/codec/open/encoder_test.go b/pkg/sink/codec/open/encoder_test.go index 463fa69b14..81123500b0 100644 --- a/pkg/sink/codec/open/encoder_test.go +++ b/pkg/sink/codec/open/encoder_test.go @@ -141,6 +141,7 @@ func TestIntegerTypes(t *testing.T) { 9223372036854775807, 18446744073709551615)` maxValues := helper.DML2Event("test", "t", sql) maxRow, ok := maxValues.GetNextRow() + require.True(t, ok) maxValueEvent := &commonEvent.RowEvent{ TableInfo: tableInfo, diff --git a/pkg/sink/mysql/mysql_writer_dml_active_active_test.go b/pkg/sink/mysql/mysql_writer_dml_active_active_test.go index f3cea842a1..3db1d0b246 100644 --- a/pkg/sink/mysql/mysql_writer_dml_active_active_test.go +++ b/pkg/sink/mysql/mysql_writer_dml_active_active_test.go @@ -159,7 +159,7 @@ func TestActiveActiveDropRowsWithNonNullOriginTsForTiDBDownstream(t *testing.T) event := helper.DML2Event("test", "t", "insert into t values (1, 'a', 10, NULL)", ) - sqls, args, rowTypes := writer.generateActiveActiveNormalSQLs([]*commonEvent.DMLEvent{event}) + sqls, args, _ := writer.generateActiveActiveNormalSQLs([]*commonEvent.DMLEvent{event}) require.Len(t, sqls, 0) require.Len(t, args, 0) @@ -167,7 +167,7 @@ func TestActiveActiveDropRowsWithNonNullOriginTsForTiDBDownstream(t *testing.T) "insert into t values (2, 'b', 11, NULL)", "insert into t values (3, 'c', 12, NULL)", ) - sqls, args, rowTypes = writer.generateActiveActiveBatchSQLForPerEvent([]*commonEvent.DMLEvent{event}) + sqls, args, rowTypes := writer.generateActiveActiveBatchSQLForPerEvent([]*commonEvent.DMLEvent{event}) require.Len(t, sqls, 0) require.Len(t, args, 0) require.Len(t, rowTypes, 0) diff --git a/pkg/tcpserver/tcp_server_test.go b/pkg/tcpserver/tcp_server_test.go index 0225f0385b..588f0d62ce 100644 --- a/pkg/tcpserver/tcp_server_test.go +++ b/pkg/tcpserver/tcp_server_test.go @@ -41,7 +41,7 @@ func TestTCPServerInsecureHTTP1(t *testing.T) { server, err := NewTCPServer(addr, &security.Credential{}) require.NoError(t, err) defer func() { - err := server.Close(nil) + err := server.Close(context.TODO()) require.NoError(t, err) }() @@ -78,7 +78,7 @@ func TestTCPServerTLSHTTP1(t *testing.T) { require.True(t, server.IsTLSEnabled()) defer func() { - err := server.Close(nil) + err := server.Close(context.TODO()) require.NoError(t, err) }() @@ -114,7 +114,7 @@ func TestTCPServerInsecureGrpc(t *testing.T) { require.NoError(t, err) defer func() { - err := server.Close(nil) + err := server.Close(context.TODO()) require.NoError(t, err) }() @@ -151,7 +151,7 @@ func TestTCPServerTLSGrpc(t *testing.T) { require.True(t, server.IsTLSEnabled()) defer func() { - err := server.Close(nil) + err := server.Close(context.TODO()) require.NoError(t, err) }() @@ -355,7 +355,7 @@ func TestTcpServerClose(t *testing.T) { // Close should be idempotent. for i := 0; i < 3; i++ { - err := server.Close(nil) + err := server.Close(context.TODO()) require.NoError(t, err) } diff --git a/pkg/upstream/upstream.go b/pkg/upstream/upstream.go index bcc0f2cdcd..df77841760 100644 --- a/pkg/upstream/upstream.go +++ b/pkg/upstream/upstream.go @@ -138,8 +138,7 @@ func CreateTiStore(ctx context.Context, urls string, credential *security.Creden } func isCreateTiStoreRetryable(ctx context.Context, err error) bool { - switch errors.Cause(err) { - case context.Canceled: + if errors.Is(errors.Cause(err), context.Canceled) { // Only stop retrying if the caller's context is canceled. // Otherwise treat it as transient (e.g. internal client cancellation). return ctx.Err() == nil diff --git a/server/module_election.go b/server/module_election.go index 6b0df0e266..7b6ab54c4f 100644 --- a/server/module_election.go +++ b/server/module_election.go @@ -156,7 +156,7 @@ func (e *elector) campaignCoordinator(ctx context.Context) error { // use a new context to prevent the context from being cancelled. resignCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) if resignErr := e.resign(resignCtx); resignErr != nil { - if errors.Cause(resignErr) != context.DeadlineExceeded { + if !errors.Is(errors.Cause(resignErr), context.DeadlineExceeded) { log.Info("coordinator resign failed", zap.String("nodeID", nodeID), zap.Error(resignErr), zap.Int64("coordinatorVersion", coordinatorVersion)) cancel() @@ -176,7 +176,7 @@ func (e *elector) campaignCoordinator(ctx context.Context) error { // we should keep the loop running to try to election coordinator again. // So, regardless of the cause of the context.Canceled, we should not exit here. // We should proceed to the next iteration of the loop, allowing subsequent logic to make the determination. - if err != nil && err != context.Canceled { + if err != nil && !errors.Is(err, context.Canceled) { log.Warn("coordinator exited report error", zap.String("nodeID", nodeID), zap.Int64("coordinatorVersion", coordinatorVersion), zap.Error(err)) @@ -203,7 +203,7 @@ func (e *elector) campaignLogCoordinator(ctx context.Context) error { } err := rl.Wait(ctx) if err != nil { - if errors.Cause(err) == context.Canceled { + if errors.Is(errors.Cause(err), context.Canceled) { return nil } return errors.Trace(err) diff --git a/tests/integration_tests/default_value/main.go b/tests/integration_tests/default_value/main.go index d9f8cd153f..9197ce8dac 100644 --- a/tests/integration_tests/default_value/main.go +++ b/tests/integration_tests/default_value/main.go @@ -27,8 +27,8 @@ import ( "time" guuid "github.com/google/uuid" - "github.com/pingcap/errors" "github.com/pingcap/log" + "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/workerpool" "github.com/pingcap/ticdc/tests/integration_tests/util" "go.uber.org/zap" @@ -52,9 +52,9 @@ var finishIdx atomic.Int32 func main() { cfg := util.NewConfig() err := cfg.Parse(os.Args[1:]) - switch errors.Cause(err) { - case nil: - case flag.ErrHelp: + switch { + case errors.Cause(err) == nil: + case errors.Is(errors.Cause(err), flag.ErrHelp): os.Exit(0) default: log.S().Errorf("parse cmd flags err %s\n", err) diff --git a/tests/integration_tests/multi_source/main.go b/tests/integration_tests/multi_source/main.go index 8bcb672e2e..66b71e978b 100644 --- a/tests/integration_tests/multi_source/main.go +++ b/tests/integration_tests/multi_source/main.go @@ -27,8 +27,8 @@ import ( "sync/atomic" "time" - "github.com/pingcap/errors" "github.com/pingcap/log" + "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/tests/integration_tests/util" "go.uber.org/zap" ) @@ -36,9 +36,9 @@ import ( func main() { cfg := util.NewConfig() err := cfg.Parse(os.Args[1:]) - switch errors.Cause(err) { - case nil: - case flag.ErrHelp: + switch { + case errors.Cause(err) == nil: + case errors.Is(errors.Cause(err), flag.ErrHelp): os.Exit(0) default: log.S().Errorf("parse cmd flags err %s\n", err) diff --git a/tests/integration_tests/util/db.go b/tests/integration_tests/util/db.go index 2c00e3ba0a..578efdc0e1 100644 --- a/tests/integration_tests/util/db.go +++ b/tests/integration_tests/util/db.go @@ -20,9 +20,9 @@ import ( "net/url" "time" - "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/ticdc/pkg/diff" + "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/tidb/pkg/util/dbutil" "go.uber.org/zap" ) @@ -152,7 +152,7 @@ func MustExec(db *sql.DB, sql string, args ...interface{}) { func MustExecWithConn(ctx context.Context, conn *sql.Conn, sql string, args ...interface{}) { var err error _, err = conn.ExecContext(ctx, sql, args...) - if err != nil && errors.Cause(err) == context.DeadlineExceeded && errors.Cause(err) == context.Canceled { + if err != nil && errors.Is(errors.Cause(err), context.DeadlineExceeded) && errors.Is(errors.Cause(err), context.Canceled) { log.S().Fatal(err) } } From 2588951a2359a8591fd4ebd4e0a79a1db2e309cf Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Thu, 14 May 2026 22:27:42 +0800 Subject: [PATCH 4/4] fix more code --- api/v2/changefeed.go | 8 ++++---- cmd/pulsar-consumer/writer.go | 2 +- coordinator/coordinator_test.go | 2 +- coordinator/create_changefeed_gc_test.go | 6 +++--- .../eventcollector/event_collector.go | 2 +- downstreamadapter/eventcollector/helper.go | 4 ++-- .../sink/mysql/causality/helper.go | 2 +- logservice/eventstore/event_store_test.go | 2 +- logservice/schemastore/persist_storage.go | 4 ++-- maintainer/maintainer_controller_test.go | 20 +++++++++---------- maintainer/maintainer_manager_test.go | 2 +- .../operator/operator_controller_test.go | 2 +- maintainer/replica/split_span_checker.go | 6 +++--- maintainer/replica/split_span_checker_test.go | 18 ++++++++--------- maintainer/scheduler/balance_splits.go | 2 +- pkg/applier/redo_test.go | 4 ++-- pkg/common/event/active_active.go | 2 +- pkg/common/event/drop_event.go | 4 ++-- pkg/common/event/drop_event_test.go | 4 ++-- pkg/common/event/not_reusable_event_test.go | 2 +- pkg/common/event/ready_event_test.go | 2 +- pkg/common/event/resolved_ts_event.go | 4 ++-- pkg/common/event/sync_point_event.go | 2 +- pkg/encryption/tikv_http_client.go | 4 ++-- pkg/eventservice/event_broker.go | 2 +- pkg/eventservice/test_helper.go | 8 ++++---- pkg/logger/log_file_monitor.go | 4 ++-- pkg/messaging/helper.go | 2 +- pkg/messaging/remote_target.go | 4 ++-- .../reactor_state_capture_test.go | 4 ++-- pkg/pdutil/clock.go | 2 +- pkg/redo/testutil/config.go | 2 +- pkg/scheduler/replica/replication.go | 2 +- server/module_election.go | 5 ++--- utils/dynstream/event_queue.go | 2 +- utils/dynstream/memory_control.go | 4 ++-- 36 files changed, 75 insertions(+), 76 deletions(-) diff --git a/api/v2/changefeed.go b/api/v2/changefeed.go index 29b7c8bf2c..0a4327aca1 100644 --- a/api/v2/changefeed.go +++ b/api/v2/changefeed.go @@ -1177,7 +1177,7 @@ func (h *OpenAPIV2) MoveTable(c *gin.Context) { return } } - err = maintainer.MoveTable(int64(tableId), node.ID(targetNodeID), mode, wait) + err = maintainer.MoveTable(tableId, node.ID(targetNodeID), mode, wait) if err != nil { log.Error("failed to move table", zap.Error(err), zap.Int64("tableID", tableId), zap.String("targetNodeID", targetNodeID)) _ = c.Error(err) @@ -1250,7 +1250,7 @@ func (h *OpenAPIV2) MoveSplitTable(c *gin.Context) { targetNodeID := c.Query("targetNodeID") mode, _ := strconv.ParseInt(c.Query("mode"), 10, 64) - err = maintainer.MoveSplitTable(int64(tableId), node.ID(targetNodeID), mode) + err = maintainer.MoveSplitTable(tableId, node.ID(targetNodeID), mode) if err != nil { log.Error("failed to move split table", zap.Error(err), zap.Int64("tableID", tableId), zap.String("targetNodeID", targetNodeID)) _ = c.Error(err) @@ -1325,7 +1325,7 @@ func (h *OpenAPIV2) SplitTableByRegionCount(c *gin.Context) { return } mode, _ := strconv.ParseInt(c.Query("mode"), 10, 64) - err = maintainer.SplitTableByRegionCount(int64(tableId), mode) + err = maintainer.SplitTableByRegionCount(tableId, mode) if err != nil { log.Error("failed to split table by region count", zap.Error(err), zap.Int64("tableID", tableId)) _ = c.Error(err) @@ -1395,7 +1395,7 @@ func (h *OpenAPIV2) MergeTable(c *gin.Context) { } mode, _ := strconv.ParseInt(c.Query("mode"), 10, 64) - err = maintainer.MergeTable(int64(tableId), mode) + err = maintainer.MergeTable(tableId, mode) if err != nil { log.Error("failed to merge table", zap.Error(err), zap.Int64("tableID", tableId)) _ = c.Error(err) diff --git a/cmd/pulsar-consumer/writer.go b/cmd/pulsar-consumer/writer.go index fbbf94c3aa..a9085d990f 100644 --- a/cmd/pulsar-consumer/writer.go +++ b/cmd/pulsar-consumer/writer.go @@ -104,7 +104,7 @@ func newWriter(ctx context.Context, o *option) *writer { o.replicaConfig.Sink.TiDBSourceID = 1 o.replicaConfig.Sink.Protocol = putil.AddressOf(o.protocol.String()) - for i := 0; i < int(o.partitionNum); i++ { + for i := 0; i < o.partitionNum; i++ { decoder, err := codec.NewEventDecoder(ctx, i, codecConfig, o.topic, db) if err != nil { log.Panic("cannot create the decoder", zap.Error(err)) diff --git a/coordinator/coordinator_test.go b/coordinator/coordinator_test.go index 608a1bf348..836a51450e 100644 --- a/coordinator/coordinator_test.go +++ b/coordinator/coordinator_test.go @@ -355,7 +355,7 @@ func newMockEtcdClient(ownerID string) *mockEtcdClient { } func (m *mockEtcdClient) GetOwnerID(ctx context.Context) (config.CaptureID, error) { - return config.CaptureID(m.ownerID), nil + return m.ownerID, nil } func TestCoordinatorScheduling(t *testing.T) { diff --git a/coordinator/create_changefeed_gc_test.go b/coordinator/create_changefeed_gc_test.go index 84ff8f277c..9712df9039 100644 --- a/coordinator/create_changefeed_gc_test.go +++ b/coordinator/create_changefeed_gc_test.go @@ -130,11 +130,11 @@ func TestUpdateGCSafepointCallsGCManagerUpdate(t *testing.T) { if kerneltype.IsClassic() { gcManager.EXPECT(). - TryUpdateServiceGCSafepoint(gomock.Any(), common.Ts(info.StartTs-1)). + TryUpdateServiceGCSafepoint(gomock.Any(), info.StartTs-1). Return(nil).Times(1) } else { gcManager.EXPECT(). - TryUpdateKeyspaceGCBarrier(gomock.Any(), gomock.Any(), gomock.Any(), common.Ts(info.StartTs-1)). + TryUpdateKeyspaceGCBarrier(gomock.Any(), gomock.Any(), gomock.Any(), info.StartTs-1). Return(nil).Times(1) } @@ -263,7 +263,7 @@ func TestConcurrentDeleteLastChangefeedAndCreateNewOneKeepsExpectedGCSafepoint(t TryDeleteServiceGCSafepoint(gomock.Any()). Times(0) gcManager.EXPECT(). - TryUpdateServiceGCSafepoint(gomock.Any(), common.Ts(newInfo.StartTs-1)). + TryUpdateServiceGCSafepoint(gomock.Any(), newInfo.StartTs-1). Return(nil). Times(1) diff --git a/downstreamadapter/eventcollector/event_collector.go b/downstreamadapter/eventcollector/event_collector.go index 6ac31cf645..f83b2719b9 100644 --- a/downstreamadapter/eventcollector/event_collector.go +++ b/downstreamadapter/eventcollector/event_collector.go @@ -726,7 +726,7 @@ func (c *EventCollector) newCongestionControlMessages() map[node.ID]*event.Conge // get available memory for this dispatcher if pathMemory, exists := changefeedPathMemory[changefeedID][dispatcherID]; exists { - nodeDispatcherMemory[eventServiceID][changefeedID][dispatcherID] = uint64(pathMemory) + nodeDispatcherMemory[eventServiceID][changefeedID][dispatcherID] = pathMemory } return true }) diff --git a/downstreamadapter/eventcollector/helper.go b/downstreamadapter/eventcollector/helper.go index 7c839fd695..a81ea1b880 100644 --- a/downstreamadapter/eventcollector/helper.go +++ b/downstreamadapter/eventcollector/helper.go @@ -106,7 +106,7 @@ func (h *EventsHandler) Handle(stat *dispatcherStat, events ...dispatcher.Dispat stat.handleHandshakeEvent(events[0]) return false default: - log.Panic("unknown event type", zap.Int("type", int(events[0].GetType()))) + log.Panic("unknown event type", zap.Int("type", events[0].GetType())) } return false } @@ -145,7 +145,7 @@ func (h *EventsHandler) GetType(event dispatcher.DispatcherEvent) dynstream.Even case commonEvent.TypeDropEvent: return dynstream.EventType{DataGroup: DataGroupDrop, Property: dynstream.NonBatchable, Droppable: false} default: - log.Panic("unknown event type", zap.Int("type", int(event.GetType()))) + log.Panic("unknown event type", zap.Int("type", event.GetType())) } return dynstream.DefaultEventType } diff --git a/downstreamadapter/sink/mysql/causality/helper.go b/downstreamadapter/sink/mysql/causality/helper.go index 17d56044cf..7aa3d011ce 100644 --- a/downstreamadapter/sink/mysql/causality/helper.go +++ b/downstreamadapter/sink/mysql/causality/helper.go @@ -80,7 +80,7 @@ func genRowKeys(row commonEvent.RowChange, tableInfo *common.TableInfo, dispatch // no concurrence for rows in the same dispatcher. log.Debug("Use dispatcherID as the key", zap.Any("dispatcherID", dispatcherID)) tableKey := make([]byte, 8) - binary.BigEndian.PutUint64(tableKey, uint64(dispatcherID.GetLow())) + binary.BigEndian.PutUint64(tableKey, dispatcherID.GetLow()) keys = [][]byte{tableKey} } return keys diff --git a/logservice/eventstore/event_store_test.go b/logservice/eventstore/event_store_test.go index 61ba0ee9e9..e32c2cb375 100644 --- a/logservice/eventstore/event_store_test.go +++ b/logservice/eventstore/event_store_test.go @@ -1104,7 +1104,7 @@ func TestWriteToEventStore(t *testing.T) { CRTs: 211, // Note: must be different from smallEntry's CRTs to avoid key collision if key is same KeyLen: uint32(len(largeEntryKey)), ValueLen: uint32(len(largeEntryValue)) * uint32(store.compressionThreshold/10), - Key: []byte(largeEntryKey), + Key: largeEntryKey, Value: bytes.Repeat(largeEntryValue, store.compressionThreshold/10), OldValue: nil, } diff --git a/logservice/schemastore/persist_storage.go b/logservice/schemastore/persist_storage.go index 394bc62f1f..f78a14b39d 100644 --- a/logservice/schemastore/persist_storage.go +++ b/logservice/schemastore/persist_storage.go @@ -761,7 +761,7 @@ func (p *persistentStorage) handleDDLJob(job *model.Job) error { for _, tableID := range tableIDs { if store, ok := p.tableInfoStoreMap[tableID]; ok { // do some safety check - switch model.ActionType(job.Type) { + switch job.Type { case model.ActionCreateTable, model.ActionCreateTables: // newly created tables should not be registered before this ddl are handled log.Panic("should not be registered", zap.Int64("tableID", tableID)) @@ -776,7 +776,7 @@ func (p *persistentStorage) handleDDLJob(job *model.Job) error { } func shouldSkipDDL(job *model.Job, tableMap map[int64]*BasicTableInfo) bool { - switch model.ActionType(job.Type) { + switch job.Type { // Skipping ActionCreateTable and ActionCreateTables when the table already exists: // 1. It is possible to receive ActionCreateTable and ActionCreateTables multiple times, // and filtering duplicates in a generic way is challenging. diff --git a/maintainer/maintainer_controller_test.go b/maintainer/maintainer_controller_test.go index 40847d323a..723eb55cf2 100644 --- a/maintainer/maintainer_controller_test.go +++ b/maintainer/maintainer_controller_test.go @@ -662,7 +662,7 @@ func TestSplitTableBalanceWhenTrafficUnbalanced(t *testing.T) { ID: spanReplica.ID.ToPB(), ComponentStatus: heartbeatpb.ComponentState_Working, EventSizePerSecond: 200, - CheckpointTs: oracle.ComposeTS(int64(currentTime.Add(-5*time.Second).UnixMilli()), 0), + CheckpointTs: oracle.ComposeTS(currentTime.Add(-5*time.Second).UnixMilli(), 0), } // provide the last three traffic status for i := 0; i < 3; i++ { @@ -692,13 +692,13 @@ func TestSplitTableBalanceWhenTrafficUnbalanced(t *testing.T) { ID: spanReplica3.ID.ToPB(), ComponentStatus: heartbeatpb.ComponentState_Working, EventSizePerSecond: 600, - CheckpointTs: oracle.ComposeTS(int64(currentTime.Add(-5*time.Second).UnixMilli()), 0), + CheckpointTs: oracle.ComposeTS(currentTime.Add(-5*time.Second).UnixMilli(), 0), } status4 := &heartbeatpb.TableSpanStatus{ ID: spanReplica4.ID.ToPB(), ComponentStatus: heartbeatpb.ComponentState_Working, EventSizePerSecond: 500, - CheckpointTs: oracle.ComposeTS(int64(currentTime.Add(-5*time.Second).UnixMilli()), 0), + CheckpointTs: oracle.ComposeTS(currentTime.Add(-5*time.Second).UnixMilli(), 0), } // provide the last three traffic status for i := 0; i < 2; i++ { @@ -791,13 +791,13 @@ func TestSplitTableBalanceWhenTrafficUnbalanced(t *testing.T) { ID: spanReplicaID7.ToPB(), ComponentStatus: heartbeatpb.ComponentState_Working, EventSizePerSecond: float32(trafficForSpanReplica7), - CheckpointTs: oracle.ComposeTS(int64(currentTime.Add(-5*time.Second).UnixMilli()), 0), + CheckpointTs: oracle.ComposeTS(currentTime.Add(-5*time.Second).UnixMilli(), 0), } status8 := &heartbeatpb.TableSpanStatus{ ID: spanReplicaID8.ToPB(), ComponentStatus: heartbeatpb.ComponentState_Working, EventSizePerSecond: float32(trafficForSpanReplica8), - CheckpointTs: oracle.ComposeTS(int64(currentTime.Add(-5*time.Second).UnixMilli()), 0), + CheckpointTs: oracle.ComposeTS(currentTime.Add(-5*time.Second).UnixMilli(), 0), } // provide the last three traffic status @@ -864,13 +864,13 @@ func TestSplitTableBalanceWhenTrafficUnbalanced(t *testing.T) { ID: spanReplicaID9.ToPB(), ComponentStatus: heartbeatpb.ComponentState_Working, EventSizePerSecond: 100, - CheckpointTs: oracle.ComposeTS(int64(currentTime.Add(-5*time.Second).UnixMilli()), 0), + CheckpointTs: oracle.ComposeTS(currentTime.Add(-5*time.Second).UnixMilli(), 0), } status10 := &heartbeatpb.TableSpanStatus{ ID: spanReplicaID10.ToPB(), ComponentStatus: heartbeatpb.ComponentState_Working, EventSizePerSecond: 50, - CheckpointTs: oracle.ComposeTS(int64(currentTime.Add(-5*time.Second).UnixMilli()), 0), + CheckpointTs: oracle.ComposeTS(currentTime.Add(-5*time.Second).UnixMilli(), 0), } // provide the last three traffic status @@ -914,7 +914,7 @@ func TestSplitTableBalanceWhenTrafficUnbalanced(t *testing.T) { ID: spanReplicaID11.ToPB(), ComponentStatus: heartbeatpb.ComponentState_Working, EventSizePerSecond: 150, - CheckpointTs: oracle.ComposeTS(int64(currentTime.Add(-5*time.Second).UnixMilli()), 0), + CheckpointTs: oracle.ComposeTS(currentTime.Add(-5*time.Second).UnixMilli(), 0), } // provide the last three traffic status @@ -1042,7 +1042,7 @@ func TestSplitTableBalanceWhenTrafficUnbalanced(t *testing.T) { ID: spanReplicaID12.ToPB(), ComponentStatus: heartbeatpb.ComponentState_Working, EventSizePerSecond: 400, - CheckpointTs: oracle.ComposeTS(int64(currentTime.Add(-5*time.Second).UnixMilli()), 0), + CheckpointTs: oracle.ComposeTS(currentTime.Add(-5*time.Second).UnixMilli(), 0), } // provide the last three traffic status @@ -1054,7 +1054,7 @@ func TestSplitTableBalanceWhenTrafficUnbalanced(t *testing.T) { ID: spanReplicaID13.ToPB(), ComponentStatus: heartbeatpb.ComponentState_Working, EventSizePerSecond: 700, - CheckpointTs: oracle.ComposeTS(int64(currentTime.Add(-5*time.Second).UnixMilli()), 0), + CheckpointTs: oracle.ComposeTS(currentTime.Add(-5*time.Second).UnixMilli(), 0), } for i := 0; i < 3; i++ { controller.spanController.UpdateStatus(spanReplica13, status13) diff --git a/maintainer/maintainer_manager_test.go b/maintainer/maintainer_manager_test.go index f6efa3baf8..c4a405ed20 100644 --- a/maintainer/maintainer_manager_test.go +++ b/maintainer/maintainer_manager_test.go @@ -574,5 +574,5 @@ func newMockEtcdClient(ownerID string) *mockEtcdClient { } func (m *mockEtcdClient) GetOwnerID(ctx context.Context) (config.CaptureID, error) { - return config.CaptureID(m.ownerID), nil + return m.ownerID, nil } diff --git a/maintainer/operator/operator_controller_test.go b/maintainer/operator/operator_controller_test.go index a0097029a7..a7497fc372 100644 --- a/maintainer/operator/operator_controller_test.go +++ b/maintainer/operator/operator_controller_test.go @@ -145,7 +145,7 @@ func setAliveNodes(nodeManager *watcher.NodeManager, alive map[node.ID]*node.Inf type nodeMap = map[node.ID]*node.Info v := reflect.ValueOf(nodeManager).Elem().FieldByName("nodes") ptr := (*syncatomic.Pointer[nodeMap])(unsafe.Pointer(v.UnsafeAddr())) - aliveCopy := nodeMap(alive) + aliveCopy := alive ptr.Store(&aliveCopy) } diff --git a/maintainer/replica/split_span_checker.go b/maintainer/replica/split_span_checker.go index 6e700a6c29..180ac8bdf8 100644 --- a/maintainer/replica/split_span_checker.go +++ b/maintainer/replica/split_span_checker.go @@ -316,7 +316,7 @@ func (s *SplitSpanChecker) Check(batch int) replica.GroupCheckResult { if !s.checkAllTaskAvailableLocked() { log.Debug("some task is not available, skip check", zap.String("changefeed", s.changefeedID.String()), - zap.Int64("group", int64(s.groupID)), + zap.Int64("group", s.groupID), ) return results } @@ -388,7 +388,7 @@ func (s *SplitSpanChecker) Check(batch int) replica.GroupCheckResult { if s.regionThreshold > 0 { countByRegion := int(math.Ceil(float64(totalRegionCount) / float64(s.regionThreshold))) if countByRegion > upperSpanCount { - upperSpanCount = int(countByRegion) + upperSpanCount = countByRegion } } @@ -786,7 +786,7 @@ func (s *SplitSpanChecker) chooseMergedSpans(batchSize int) ([]SplitSpanCheckRes if len(mergeSpans) > 1 { log.Info("chooseMergedSpans merge spans", zap.String("changefeed", s.changefeedID.String()), - zap.Int64("group", int64(s.groupID)), + zap.Int64("group", s.groupID), zap.Any("mergeSpans", mergeSpans), zap.Any("node", mergeSpans[0].GetNodeID()), ) diff --git a/maintainer/replica/split_span_checker_test.go b/maintainer/replica/split_span_checker_test.go index 7f5d29912c..92cce0421c 100644 --- a/maintainer/replica/split_span_checker_test.go +++ b/maintainer/replica/split_span_checker_test.go @@ -662,7 +662,7 @@ func TestSplitSpanChecker_CheckBalanceTraffic_Balance(t *testing.T) { // Set region counts for _, spanStatus := range []*splitSpanStatus{spanStatus1, spanStatus2, spanStatus3, spanStatus4} { spanStatus.regionCount = 3 - spanStatus.GetStatus().CheckpointTs = oracle.ComposeTS(int64(time.Now().Add(-10*time.Second).UnixMilli()), 0) + spanStatus.GetStatus().CheckpointTs = oracle.ComposeTS(time.Now().Add(-10*time.Second).UnixMilli(), 0) } checker.balanceCondition.statusUpdated = true @@ -764,9 +764,9 @@ func TestSplitSpanChecker_CheckBalanceTraffic_SplitIfNoMove(t *testing.T) { // Set region counts spanStatus1.regionCount = 5 - spanStatus1.GetStatus().CheckpointTs = oracle.ComposeTS(int64(time.Now().Add(-10*time.Second).UnixMilli()), 0) + spanStatus1.GetStatus().CheckpointTs = oracle.ComposeTS(time.Now().Add(-10*time.Second).UnixMilli(), 0) spanStatus2.regionCount = 5 - spanStatus2.GetStatus().CheckpointTs = oracle.ComposeTS(int64(time.Now().Add(-10*time.Second).UnixMilli()), 0) + spanStatus2.GetStatus().CheckpointTs = oracle.ComposeTS(time.Now().Add(-10*time.Second).UnixMilli(), 0) checker.balanceCondition.statusUpdated = true @@ -975,7 +975,7 @@ func TestSplitSpanChecker_ChooseMergedSpans_Continuous(t *testing.T) { ID: replica.ID.ToPB(), ComponentStatus: heartbeatpb.ComponentState_Working, EventSizePerSecond: 200, - CheckpointTs: oracle.ComposeTS(int64(currentTime.Add(-10*time.Second).UnixMilli()), 0), + CheckpointTs: oracle.ComposeTS(currentTime.Add(-10*time.Second).UnixMilli(), 0), } replica.UpdateStatus(status) } @@ -1045,7 +1045,7 @@ func TestSplitSpanChecker_ChooseMoveSpans_SimpleMove(t *testing.T) { ID: replica.ID.ToPB(), ComponentStatus: heartbeatpb.ComponentState_Working, EventSizePerSecond: 200, - CheckpointTs: oracle.ComposeTS(int64(currentTime.Add(-10*time.Second).UnixMilli()), 0), + CheckpointTs: oracle.ComposeTS(currentTime.Add(-10*time.Second).UnixMilli(), 0), } replica.UpdateStatus(status) } @@ -1107,7 +1107,7 @@ func TestSplitSpanChecker_ChooseMoveSpans_ExchangeMove(t *testing.T) { ID: replica.ID.ToPB(), ComponentStatus: heartbeatpb.ComponentState_Working, EventSizePerSecond: 100, - CheckpointTs: oracle.ComposeTS(int64(currentTime.Add(-10*time.Second).UnixMilli()), 0), + CheckpointTs: oracle.ComposeTS(currentTime.Add(-10*time.Second).UnixMilli(), 0), } replica.UpdateStatus(status) } @@ -1185,7 +1185,7 @@ func TestSplitSpanChecker_Check_FullFlow(t *testing.T) { ID: replica.ID.ToPB(), ComponentStatus: heartbeatpb.ComponentState_Working, EventSizePerSecond: 200, - CheckpointTs: oracle.ComposeTS(int64(currentTime.Add(-10*time.Second).UnixMilli()), 0), + CheckpointTs: oracle.ComposeTS(currentTime.Add(-10*time.Second).UnixMilli(), 0), } replica.UpdateStatus(status) } @@ -1261,7 +1261,7 @@ func TestSplitSpanChecker_Check_FullFlow_WriteThresholdZero(t *testing.T) { ID: replica.ID.ToPB(), ComponentStatus: heartbeatpb.ComponentState_Working, EventSizePerSecond: 200, - CheckpointTs: oracle.ComposeTS(int64(currentTime.Add(-10*time.Second).UnixMilli()), 0), + CheckpointTs: oracle.ComposeTS(currentTime.Add(-10*time.Second).UnixMilli(), 0), } replica.UpdateStatus(status) } @@ -1337,7 +1337,7 @@ func TestSplitSpanChecker_Check_FullFlow_RegionThresholdZero(t *testing.T) { ID: replica.ID.ToPB(), ComponentStatus: heartbeatpb.ComponentState_Working, EventSizePerSecond: 0, - CheckpointTs: oracle.ComposeTS(int64(currentTime.Add(-10*time.Second).UnixMilli()), 0), + CheckpointTs: oracle.ComposeTS(currentTime.Add(-10*time.Second).UnixMilli(), 0), } replica.UpdateStatus(status) } diff --git a/maintainer/scheduler/balance_splits.go b/maintainer/scheduler/balance_splits.go index 7f808a01a9..08ab66be5e 100644 --- a/maintainer/scheduler/balance_splits.go +++ b/maintainer/scheduler/balance_splits.go @@ -120,7 +120,7 @@ func (s *balanceSplitsScheduler) Execute() time.Time { if len(replications) != s.spanController.GetTaskSizeByGroup(group) { log.Info("here is some spans in the group is not in the replicating state; skip this balance check", zap.String("changefeed", s.changefeedID.String()), - zap.Int64("group", int64(group)), + zap.Int64("group", group), zap.Int("replicatingSize", len(replications)), zap.Int("taskSize", s.spanController.GetTaskSizeByGroup(group))) continue diff --git a/pkg/applier/redo_test.go b/pkg/applier/redo_test.go index a5e47784cf..5dbc0c60e7 100644 --- a/pkg/applier/redo_test.go +++ b/pkg/applier/redo_test.go @@ -618,7 +618,7 @@ func getMockDB(t *testing.T) *sql.DB { mock.ExpectBegin() mock.ExpectExec("INSERT INTO `test`.`t1` (`a`,`b`) VALUES (?,?)"). - WithArgs(1, []byte([]byte("20"))). + WithArgs(1, []byte("20")). WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectCommit() @@ -630,7 +630,7 @@ func getMockDB(t *testing.T) *sql.DB { mock.ExpectBegin() mock.ExpectExec("INSERT INTO `test`.`t1` (`a`,`b`) VALUES (?,?)"). - WithArgs(10, []byte([]byte("20"))). + WithArgs(10, []byte("20")). WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectExec("INSERT INTO `test`.`t1` (`a`,`b`) VALUES (?,?)"). WithArgs(100, []byte("200")). diff --git a/pkg/common/event/active_active.go b/pkg/common/event/active_active.go index 2b91c2e616..9a6f583c75 100644 --- a/pkg/common/event/active_active.go +++ b/pkg/common/event/active_active.go @@ -275,7 +275,7 @@ func filterDMLEventSlowPath( log.Info("received hard delete row", zap.Stringer("dispatcherID", event.DispatcherID), zap.Int64("tableID", tableInfo.TableName.TableID), - zap.Uint64("commitTs", uint64(event.CommitTs))) + zap.Uint64("commitTs", event.CommitTs)) } } filtered = true diff --git a/pkg/common/event/drop_event.go b/pkg/common/event/drop_event.go index 583c92d26e..2e74fefc9f 100644 --- a/pkg/common/event/drop_event.go +++ b/pkg/common/event/drop_event.go @@ -153,7 +153,7 @@ func (e *DropEvent) encodeV1() ([]byte, error) { offset += 8 // DroppedCommitTs - binary.BigEndian.PutUint64(data[offset:], uint64(e.DroppedCommitTs)) + binary.BigEndian.PutUint64(data[offset:], e.DroppedCommitTs) offset += 8 // DroppedEpoch @@ -178,7 +178,7 @@ func (e *DropEvent) decodeV1(data []byte) error { offset += 8 // DroppedCommitTs - e.DroppedCommitTs = common.Ts(binary.BigEndian.Uint64(data[offset:])) + e.DroppedCommitTs = binary.BigEndian.Uint64(data[offset:]) offset += 8 // DroppedEpoch diff --git a/pkg/common/event/drop_event_test.go b/pkg/common/event/drop_event_test.go index d12aa332c1..524772034e 100644 --- a/pkg/common/event/drop_event_test.go +++ b/pkg/common/event/drop_event_test.go @@ -25,7 +25,7 @@ func TestDropEvent(t *testing.T) { e := NewDropEvent(did, 123, 100, 456) data, err := e.Marshal() require.NoError(t, err) - require.Len(t, data, int(e.GetSize())+int(GetEventHeaderSize())) + require.Len(t, data, int(e.GetSize())+GetEventHeaderSize()) var e2 DropEvent err = e2.Unmarshal(data) @@ -110,7 +110,7 @@ func TestDropEventMarshalUnmarshal(t *testing.T) { return } require.NoError(t, err) - require.Len(t, data, int(tc.event.GetSize())+int(GetEventHeaderSize())) + require.Len(t, data, int(tc.event.GetSize())+GetEventHeaderSize()) // Test Unmarshal var e2 DropEvent diff --git a/pkg/common/event/not_reusable_event_test.go b/pkg/common/event/not_reusable_event_test.go index 43d0081c9d..c1a3ebe0e4 100644 --- a/pkg/common/event/not_reusable_event_test.go +++ b/pkg/common/event/not_reusable_event_test.go @@ -26,7 +26,7 @@ func TestNotReusableEvent(t *testing.T) { e := NewNotReusableEvent(did) data, err := e.Marshal() require.NoError(t, err) - require.Len(t, data, int(e.GetSize())+int(GetEventHeaderSize())) + require.Len(t, data, int(e.GetSize())+GetEventHeaderSize()) var e2 NotReusableEvent err = e2.Unmarshal(data) diff --git a/pkg/common/event/ready_event_test.go b/pkg/common/event/ready_event_test.go index e8fde74e46..cf1c45c543 100644 --- a/pkg/common/event/ready_event_test.go +++ b/pkg/common/event/ready_event_test.go @@ -26,7 +26,7 @@ func TestReadyEvent(t *testing.T) { e := NewReadyEvent(did) data, err := e.Marshal() require.NoError(t, err) - require.Len(t, data, int(e.GetSize())+int(GetEventHeaderSize())) + require.Len(t, data, int(e.GetSize())+GetEventHeaderSize()) var e2 ReadyEvent err = e2.Unmarshal(data) diff --git a/pkg/common/event/resolved_ts_event.go b/pkg/common/event/resolved_ts_event.go index 4693d8641b..5174db3eb7 100644 --- a/pkg/common/event/resolved_ts_event.go +++ b/pkg/common/event/resolved_ts_event.go @@ -237,7 +237,7 @@ func (e ResolvedEvent) encodeV1() ([]byte, error) { offset := 0 // ResolvedTs - binary.BigEndian.PutUint64(data[offset:], uint64(e.ResolvedTs)) + binary.BigEndian.PutUint64(data[offset:], e.ResolvedTs) offset += 8 // Epoch @@ -259,7 +259,7 @@ func (e *ResolvedEvent) decodeV1(data []byte) error { offset := 0 // ResolvedTs - e.ResolvedTs = common.Ts(binary.BigEndian.Uint64(data[offset:])) + e.ResolvedTs = binary.BigEndian.Uint64(data[offset:]) offset += 8 // Epoch diff --git a/pkg/common/event/sync_point_event.go b/pkg/common/event/sync_point_event.go index 9ad721c6d5..3256ccb178 100644 --- a/pkg/common/event/sync_point_event.go +++ b/pkg/common/event/sync_point_event.go @@ -167,7 +167,7 @@ func (e SyncPointEvent) encodeV1() ([]byte, error) { offset := 0 // Seq - binary.BigEndian.PutUint64(data[offset:], uint64(e.Seq)) + binary.BigEndian.PutUint64(data[offset:], e.Seq) offset += 8 // Epoch diff --git a/pkg/encryption/tikv_http_client.go b/pkg/encryption/tikv_http_client.go index d2c9d971f1..551188077b 100644 --- a/pkg/encryption/tikv_http_client.go +++ b/pkg/encryption/tikv_http_client.go @@ -356,7 +356,7 @@ func (r *encryptionMetaResponse) toEncryptionMeta() (*EncryptionMeta, error) { dataKeys := make(map[uint32]*DataKey, len(r.DataKeys)) for id, dk := range r.DataKeys { - dataKeys[id] = &DataKey{Ciphertext: []byte(dk.Ciphertext)} + dataKeys[id] = &DataKey{Ciphertext: dk.Ciphertext} } if _, ok := dataKeys[r.Current.DataKeyId]; !ok { @@ -395,7 +395,7 @@ func (r *masterKeyResponse) toMasterKey() *MasterKey { CmekId: r.CmekId, Region: r.Region, Endpoint: r.Endpoint, - Ciphertext: []byte(r.Ciphertext), + Ciphertext: r.Ciphertext, } } diff --git a/pkg/eventservice/event_broker.go b/pkg/eventservice/event_broker.go index 7eb5dbd23b..6d3d2704a1 100644 --- a/pkg/eventservice/event_broker.go +++ b/pkg/eventservice/event_broker.go @@ -691,7 +691,7 @@ func (c *eventBroker) doScan(ctx context.Context, task scanTask) { } if uint64(sl.maxDMLBytes) > task.availableMemoryQuota.Load() { - log.Debug("dispatcher available memory quota is not enough, skip scan", zap.Stringer("dispatcher", task.id), zap.Uint64("available", task.availableMemoryQuota.Load()), zap.Int64("required", int64(sl.maxDMLBytes))) + log.Debug("dispatcher available memory quota is not enough, skip scan", zap.Stringer("dispatcher", task.id), zap.Uint64("available", task.availableMemoryQuota.Load()), zap.Int64("required", sl.maxDMLBytes)) c.sendSignalResolvedTs(task) metrics.EventServiceSkipScanCount.WithLabelValues("dispatcher_quota").Inc() return diff --git a/pkg/eventservice/test_helper.go b/pkg/eventservice/test_helper.go index 06727c2e96..e0452a9acd 100644 --- a/pkg/eventservice/test_helper.go +++ b/pkg/eventservice/test_helper.go @@ -64,11 +64,11 @@ func (m *mockSchemaStore) Close(ctx context.Context) error { func (m *mockSchemaStore) DeleteTable(id common.TableID, ts common.Ts) { if info, ok := m.TableInfo[id]; ok { - info.deleteVersion = uint64(ts) + info.deleteVersion = ts } else { m.TableInfo[id] = &mockVersionTableInfo{ tableInfos: make([]*common.TableInfo, 0), - deleteVersion: uint64(ts), + deleteVersion: ts, } } } @@ -92,12 +92,12 @@ func (m *mockSchemaStore) SetTables(tables []commonEvent.Table) { func (m *mockSchemaStore) GetTableInfo(keyspaceMeta common.KeyspaceMeta, tableID common.TableID, ts common.Ts) (*common.TableInfo, error) { if info, ok := m.TableInfo[tableID]; ok { - if info.deleteVersion <= uint64(ts) { + if info.deleteVersion <= ts { return nil, &schemastore.TableDeletedError{} } infos := m.TableInfo[tableID].tableInfos idx := sort.Search(len(infos), func(i int) bool { - return infos[i].GetUpdateTS() > uint64(ts) + return infos[i].GetUpdateTS() > ts }) if idx == 0 { return nil, nil diff --git a/pkg/logger/log_file_monitor.go b/pkg/logger/log_file_monitor.go index 5058b96048..328f12b2a5 100644 --- a/pkg/logger/log_file_monitor.go +++ b/pkg/logger/log_file_monitor.go @@ -124,8 +124,8 @@ func collectDiskTotalUsedBytes(path string) (totalBytes, usedBytes uint64) { if err := syscall.Statfs(path, &st); err != nil { return 0, 0 } - totalBytes = uint64(st.Blocks) * uint64(st.Bsize) - freeBytes := uint64(st.Bfree) * uint64(st.Bsize) + totalBytes = st.Blocks * uint64(st.Bsize) + freeBytes := st.Bfree * uint64(st.Bsize) usedBytes = totalBytes - freeBytes return totalBytes, usedBytes } diff --git a/pkg/messaging/helper.go b/pkg/messaging/helper.go index 0c2cf832f0..fdaca7da6a 100644 --- a/pkg/messaging/helper.go +++ b/pkg/messaging/helper.go @@ -57,5 +57,5 @@ func NewMessageCenterForTest(t *testing.T) (*messageCenter, string, func()) { cancel() wg.Wait() } - return mc, string(addr), stop + return mc, addr, stop } diff --git a/pkg/messaging/remote_target.go b/pkg/messaging/remote_target.go index 852ebe6336..b8a33d009e 100644 --- a/pkg/messaging/remote_target.go +++ b/pkg/messaging/remote_target.go @@ -265,7 +265,7 @@ func (s *remoteMessageTarget) connect() error { zap.Stringer("remoteID", s.targetId), zap.String("remoteAddr", s.targetAddr)) - conn, err := conn.Connect(string(s.targetAddr), s.security) + conn, err := conn.Connect(s.targetAddr, s.security) if err != nil { log.Info("Cannot create grpc client", zap.Any("localID", s.messageCenterID), @@ -669,7 +669,7 @@ func (s *remoteMessageTarget) newMessage(msg ...*TargetMessage) *proto.Message { protoMsg := &proto.Message{ From: string(s.messageCenterID), To: string(s.targetId), - Topic: string(msg[0].Topic), + Topic: msg[0].Topic, Type: int32(msg[0].Type), Payload: msgBytes, } diff --git a/pkg/orchestrator/reactor_state_capture_test.go b/pkg/orchestrator/reactor_state_capture_test.go index 032368e7f7..4ee9c77d83 100644 --- a/pkg/orchestrator/reactor_state_capture_test.go +++ b/pkg/orchestrator/reactor_state_capture_test.go @@ -90,13 +90,13 @@ func mustUpdateCapture( data, err := info.Marshal() require.NoError(t, err) - err = state.Update(util.NewEtcdKey(etcd.GetEtcdKeyCaptureInfo(state.ClusterID, string(captureID))), data, false) + err = state.Update(util.NewEtcdKey(etcd.GetEtcdKeyCaptureInfo(state.ClusterID, captureID)), data, false) require.NoError(t, err) } func mustDeleteCapture(t *testing.T, state *GlobalReactorState, captureID config.CaptureID) { t.Helper() - err := state.Update(util.NewEtcdKey(etcd.GetEtcdKeyCaptureInfo(state.ClusterID, string(captureID))), nil, false) + err := state.Update(util.NewEtcdKey(etcd.GetEtcdKeyCaptureInfo(state.ClusterID, captureID)), nil, false) require.NoError(t, err) } diff --git a/pkg/pdutil/clock.go b/pkg/pdutil/clock.go index a709338554..ad511fac37 100644 --- a/pkg/pdutil/clock.go +++ b/pkg/pdutil/clock.go @@ -186,7 +186,7 @@ func (c *clockWithValue4Test) CurrentTime() time.Time { } func (c *clockWithValue4Test) CurrentTS() uint64 { - return oracle.ComposeTS(int64(c.value.UnixNano()), 0) + return oracle.ComposeTS(c.value.UnixNano(), 0) } func (c *clockWithValue4Test) Run(ctx context.Context) { diff --git a/pkg/redo/testutil/config.go b/pkg/redo/testutil/config.go index ef1cfe35b1..2579a9a6bb 100644 --- a/pkg/redo/testutil/config.go +++ b/pkg/redo/testutil/config.go @@ -22,7 +22,7 @@ import ( func NewConsistentConfig(storage string) *config.ConsistentConfig { level := string(redo.ConsistentLevelEventual) - maxLogSize := int64(redo.DefaultMaxLogSize) + maxLogSize := redo.DefaultMaxLogSize flushIntervalInMs := int64(redo.DefaultFlushIntervalInMs) metaFlushIntervalInMs := int64(redo.MinFlushIntervalInMs) encodingWorkerNum := redo.DefaultEncodingWorkerNum diff --git a/pkg/scheduler/replica/replication.go b/pkg/scheduler/replica/replication.go index 2cab19c294..204e56c995 100644 --- a/pkg/scheduler/replica/replication.go +++ b/pkg/scheduler/replica/replication.go @@ -391,7 +391,7 @@ func (db *replicationDB[T, R]) maybeRemoveGroup(g *replicationGroup[T, R]) { log.Info("scheduler: remove task group", zap.String("schedulerID", db.id), zap.String("group", GetGroupName(g.groupID)), zap.Stringer("groupType", GroupType(g.groupID>>56))) - zap.Int64("groupID", int64(g.groupID)) + zap.Int64("groupID", g.groupID) } func (db *replicationDB[T, R]) mustGetGroup(groupID GroupID) *replicationGroup[T, R] { diff --git a/server/module_election.go b/server/module_election.go index 7b6ab54c4f..b9c781f8e4 100644 --- a/server/module_election.go +++ b/server/module_election.go @@ -23,7 +23,6 @@ import ( "github.com/pingcap/ticdc/coordinator/changefeed" logcoordinator "github.com/pingcap/ticdc/logservice/coordinator" "github.com/pingcap/ticdc/pkg/common" - "github.com/pingcap/ticdc/pkg/config" "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/etcd" "github.com/pingcap/ticdc/pkg/liveness" @@ -126,7 +125,7 @@ func (e *elector) campaignCoordinator(ctx context.Context) error { return nil } - coordinatorVersion, err := e.svr.EtcdClient.GetOwnerRevision(ctx, config.CaptureID(nodeID)) + coordinatorVersion, err := e.svr.EtcdClient.GetOwnerRevision(ctx, nodeID) if err != nil { return errors.Trace(err) } @@ -247,7 +246,7 @@ func (e *elector) campaignLogCoordinator(ctx context.Context) error { return nil } - logCoordinatorVersion, err := e.svr.EtcdClient.GetLogCoordinatorRevision(ctx, config.CaptureID(nodeID)) + logCoordinatorVersion, err := e.svr.EtcdClient.GetLogCoordinatorRevision(ctx, nodeID) if err != nil { return errors.Trace(err) } diff --git a/utils/dynstream/event_queue.go b/utils/dynstream/event_queue.go index e460ab9018..ecd5e23ab9 100644 --- a/utils/dynstream/event_queue.go +++ b/utils/dynstream/event_queue.go @@ -82,7 +82,7 @@ func (q *eventQueue[A, P, T, D, H]) releasePath(path *pathInfo[A, P, T, D, H]) { } if path.areaMemStat != nil { - path.areaMemStat.decPendingSize(path, int64(path.pendingSize.Load())) + path.areaMemStat.decPendingSize(path, path.pendingSize.Load()) path.areaMemStat.lastSizeDecreaseTime.Store(time.Now()) } diff --git a/utils/dynstream/memory_control.go b/utils/dynstream/memory_control.go index 93af18b8ed..f31dff997f 100644 --- a/utils/dynstream/memory_control.go +++ b/utils/dynstream/memory_control.go @@ -207,7 +207,7 @@ func (as *areaMemStat[A, P, T, D, H]) releaseMemory() { continue } - releasedSize += int64(path.pendingSize.Load()) + releasedSize += path.pendingSize.Load() releasedPaths = append(releasedPaths, path) } @@ -286,7 +286,7 @@ func (as *areaMemStat[A, P, T, D, H]) updateAreaPauseState(path *pathInfo[A, P, } func (as *areaMemStat[A, P, T, D, H]) decPendingSize(path *pathInfo[A, P, T, D, H], size int64) { - as.totalPendingSize.Add(int64(-size)) + as.totalPendingSize.Add(-size) if as.totalPendingSize.Load() < 0 { log.Debug("Total pending size is less than 0, reset it to 0", zap.Int64("totalPendingSize", as.totalPendingSize.Load()), zap.String("component", as.settings.Load().component)) as.totalPendingSize.Store(0)