Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions go/base/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,12 +273,13 @@ type MigrationContext struct {

// move tables:
MoveTables struct {
TableNames []string // List of table names to be moved.
TargetHost string // Target hostname for the move. This must be a primary/writable host.
TargetPort int // Target MySQL port for the move.
TargetUser string // Target username for the move. If not specified, it will default to the source user.
TargetPass string // Target password for the move. If not specified, it will default to the source password.
TargetDatabase string // Target database name for the move. If not specified, it will default to the source database name.
TableNames []string // List of table names to be moved.
TargetHost string // Target hostname for the move. This must be a primary/writable host.
TargetPort int // Target MySQL port for the move.
TargetUser string // Target username for the move. If not specified, it will default to the source user.
TargetPass string // Target password for the move. If not specified, it will default to the source password.
TargetDatabase string // Target database name for the move. If not specified, it will default to the source database name.
ConnectionConfig *mysql.ConnectionConfig
}

Log Logger
Expand Down Expand Up @@ -353,6 +354,9 @@ func (mctx *MigrationContext) SetConnectionConfig(storageEngine string) error {
}
mctx.InspectorConnectionConfig.TransactionIsolation = transactionIsolation
mctx.ApplierConnectionConfig.TransactionIsolation = transactionIsolation
if mctx.MoveTables.ConnectionConfig != nil {
mctx.MoveTables.ConnectionConfig.TransactionIsolation = transactionIsolation
}
return nil
}

Expand All @@ -363,6 +367,9 @@ func (mctx *MigrationContext) SetConnectionCharset(charset string) {

mctx.InspectorConnectionConfig.Charset = charset
mctx.ApplierConnectionConfig.Charset = charset
if mctx.MoveTables.ConnectionConfig != nil {
mctx.MoveTables.ConnectionConfig.Charset = charset
}
}

func getSafeTableName(baseName string, suffix string) string {
Expand Down
50 changes: 38 additions & 12 deletions go/logic/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
checkpointInsertQueryBuilder *sql.CheckpointInsertQueryBuilder

moveTablesTargetDB *gosql.DB
moveTablesConnectionConfig *mysql.ConnectionConfig
moveTablesCopySelectFirstQueryBuilder *sql.MoveTableCopySelectQueryBuilder
moveTablesCopySelectNextQueryBuilder *sql.MoveTableCopySelectQueryBuilder
moveTablesCopyInsertQueryBuilder *sql.MoveTableCopyInsertQueryBuilder
Expand All @@ -96,6 +97,8 @@
migrationContext: migrationContext,
finishedMigrating: 0,
name: "applier",

moveTablesConnectionConfig: migrationContext.MoveTables.ConnectionConfig,
}
}

Expand Down Expand Up @@ -146,31 +149,47 @@
if err := apl.readTableColumns(); err != nil {
return err
}
if apl.moveTablesConnectionConfig != nil {
moveTablesURI := apl.moveTablesConnectionConfig.GetDBUri(apl.migrationContext.MoveTables.TargetDatabase) + "&multiStatements=true"
if apl.moveTablesTargetDB, _, err = mysql.GetDB(apl.migrationContext.Uuid, moveTablesURI); err != nil {
return err
}
if _, err := base.ValidateConnection(apl.moveTablesTargetDB, apl.moveTablesConnectionConfig, apl.migrationContext, apl.name); err != nil {
return err
}
}
apl.migrationContext.Log.Infof("Applier initiated on %+v, version %+v", apl.connectionConfig.ImpliedKey, apl.migrationContext.ApplierMySQLVersion)
return nil
}

func (apl *Applier) prepareQueries() (err error) {
targetDatabaseName := apl.migrationContext.DatabaseName
targetTableName := apl.migrationContext.GetGhostTableName()
if apl.migrationContext.IsMoveTablesMode() {
targetDatabaseName = apl.migrationContext.MoveTables.TargetDatabase
targetTableName = apl.migrationContext.OriginalTableName
}

if apl.dmlDeleteQueryBuilder, err = sql.NewDMLDeleteQueryBuilder(
apl.migrationContext.DatabaseName,
apl.migrationContext.GetGhostTableName(),
targetDatabaseName,
targetTableName,
apl.migrationContext.OriginalTableColumns,
&apl.migrationContext.UniqueKey.Columns,
); err != nil {
return err
}
if apl.dmlInsertQueryBuilder, err = sql.NewDMLInsertQueryBuilder(
apl.migrationContext.DatabaseName,
apl.migrationContext.GetGhostTableName(),
targetDatabaseName,
targetTableName,
apl.migrationContext.OriginalTableColumns,
apl.migrationContext.SharedColumns,
apl.migrationContext.MappedSharedColumns,
); err != nil {
return err
}
if apl.dmlUpdateQueryBuilder, err = sql.NewDMLUpdateQueryBuilder(
apl.migrationContext.DatabaseName,
apl.migrationContext.GetGhostTableName(),
targetDatabaseName,
targetTableName,
apl.migrationContext.OriginalTableColumns,
apl.migrationContext.SharedColumns,
apl.migrationContext.MappedSharedColumns,
Expand All @@ -191,7 +210,7 @@
if apl.moveTablesCopySelectFirstQueryBuilder, err = sql.NewMoveTableCopySelectQueryBuilder(
apl.migrationContext.DatabaseName,
apl.migrationContext.OriginalTableName,
apl.migrationContext.SharedColumns,
apl.migrationContext.OriginalTableColumns,
apl.migrationContext.UniqueKey.Name,
&apl.migrationContext.UniqueKey.Columns,
true, // <-- include start range values for first select query
Expand All @@ -201,17 +220,17 @@
if apl.moveTablesCopySelectNextQueryBuilder, err = sql.NewMoveTableCopySelectQueryBuilder(
apl.migrationContext.DatabaseName,
apl.migrationContext.OriginalTableName,
apl.migrationContext.SharedColumns,
apl.migrationContext.OriginalTableColumns,
apl.migrationContext.UniqueKey.Name,
&apl.migrationContext.UniqueKey.Columns,
false,
); err != nil {
return err
}
if apl.moveTablesCopyInsertQueryBuilder, err = sql.NewMoveTableCopyInsertQueryBuilder(
apl.migrationContext.MoveTables.TargetDatabase,
apl.migrationContext.OriginalTableName,
apl.migrationContext.SharedColumns,
targetDatabaseName,
targetTableName,
apl.migrationContext.OriginalTableColumns,
); err != nil {
return err
}
Expand Down Expand Up @@ -1068,7 +1087,7 @@
if err != nil {
return nil, err
}
sqlRows, err := apl.db.Query(query, explodedArgs...)

Check failure on line 1090 in go/logic/applier.go

View workflow job for this annotation

GitHub Actions / lint

rows.Err must be checked (rowserrcheck)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1781,7 +1800,11 @@
ctx := context.Background()

err := func() error {
conn, err := apl.db.Conn(ctx)
db := apl.db
if apl.migrationContext.IsMoveTablesMode() {
db = apl.moveTablesTargetDB
}
conn, err := db.Conn(ctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -1885,6 +1908,9 @@
apl.migrationContext.Log.Debugf("Tearing down...")
apl.db.Close()
apl.singletonDB.Close()
if apl.moveTablesTargetDB != nil {
apl.moveTablesTargetDB.Close()
}
atomic.StoreInt64(&apl.finishedMigrating, 1)
}

Expand Down
71 changes: 69 additions & 2 deletions go/logic/applier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@
suite.Require().NoError(err)

// Second database & connection for move-tables tests:
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s", testMysqlDatabaseOther))

Check failure on line 304 in go/logic/applier_test.go

View workflow job for this annotation

GitHub Actions / lint

ineffectual assignment to err (ineffassign)
otherConf := drivermysql.NewConfig()
otherConf.DBName = testMysqlDatabaseOther
otherConf.User = testMysqlUser
Expand Down Expand Up @@ -332,6 +332,8 @@
suite.Require().NoError(err)
_, err = suite.db.ExecContext(ctx, "DROP TABLE IF EXISTS "+getTestGhostTableName())
suite.Require().NoError(err)
_, err = suite.otherDB.ExecContext(ctx, "DROP TABLE IF EXISTS "+getTestOtherTableName())
suite.Require().NoError(err)
}

func (suite *ApplierTestSuite) TestInitDBConnections() {
Expand Down Expand Up @@ -1561,6 +1563,72 @@
// Critically: id=2 (bob@example.com) is NOT present, proving event #3 was rolled back
}

func (suite *ApplierTestSuite) TestApplyDMLEventQueriesMoveTablesMode() {
ctx := context.Background()
var err error

_, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT, item_id INT);", getTestTableName()))
suite.Require().NoError(err)
_, err = suite.otherDB.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT, item_id INT);", getTestOtherTableName()))
suite.Require().NoError(err)

connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer)
suite.Require().NoError(err)

migrationContext := newTestMigrationContext()
migrationContext.ApplierConnectionConfig = connectionConfig
migrationContext.MoveTables.ConnectionConfig = connectionConfig
migrationContext.SetConnectionConfig("innodb")

migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id", "item_id"})
migrationContext.SharedColumns = sql.NewColumnList([]string{"id", "item_id"})
migrationContext.MappedSharedColumns = sql.NewColumnList([]string{"id", "item_id"})
migrationContext.UniqueKey = &sql.UniqueKey{
Name: "primary_key",
Columns: *sql.NewColumnList([]string{"id"}),
}
migrationContext.MoveTables.TableNames = []string{testMysqlTableName}
migrationContext.MoveTables.TargetDatabase = testMysqlDatabaseOther

applier := NewApplier(migrationContext)
suite.Require().NoError(applier.prepareQueries())
defer applier.Teardown()

err = applier.InitDBConnections()
suite.Require().NoError(err)

dmlEvents := []*binlog.BinlogDMLEvent{
{
DatabaseName: testMysqlDatabase,
TableName: testMysqlTableName,
DML: binlog.InsertDML,
NewColumnValues: sql.ToColumnValues([]interface{}{123456, 42}),
},
}
err = applier.ApplyDMLEventQueries(dmlEvents)
suite.Require().NoError(err)

// Check that the row was inserted into the ghost table via moveTablesTargetDB
rows, err := suite.otherDB.Query("SELECT * FROM " + getTestOtherTableName())
suite.Require().NoError(err)
defer rows.Close()

var count, id, item_id int
for rows.Next() {
err = rows.Scan(&id, &item_id)
suite.Require().NoError(err)
count += 1
}
suite.Require().NoError(rows.Err())

suite.Require().Equal(1, count)
suite.Require().Equal(123456, id)
suite.Require().Equal(42, item_id)

suite.Require().Equal(int64(1), migrationContext.TotalDMLEventsApplied)
suite.Require().Equal(int64(0), migrationContext.RowsDeltaEstimate)
}

func (suite *ApplierTestSuite) TestApplyIterationMoveTableCopyQueries() {
ctx := context.Background()
var err error
Expand All @@ -1577,6 +1645,7 @@

migrationContext := newTestMigrationContext()
migrationContext.ApplierConnectionConfig = connectionConfig
migrationContext.MoveTables.ConnectionConfig = connectionConfig
migrationContext.SetConnectionConfig("innodb")
migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id", "name", "created_at"})
migrationContext.SharedColumns = sql.NewColumnList([]string{"id", "name", "created_at"})
Expand All @@ -1595,8 +1664,6 @@
err = applier.InitDBConnections()
suite.Require().NoError(err)

applier.moveTablesTargetDB = suite.otherDB

err = applier.CreateChangelogTable()
suite.Require().NoError(err)

Expand Down
24 changes: 12 additions & 12 deletions go/sql/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,14 +336,14 @@ type MoveTableCopySelectQueryBuilder struct {
argsCount int
}

func NewMoveTableCopySelectQueryBuilder(sourceDatabaseName, sourceTableName string, sharedColumns *ColumnList, uniqueKey string, uniqueKeyColumns *ColumnList, includeRangeStartValues bool) (*MoveTableCopySelectQueryBuilder, error) {
func NewMoveTableCopySelectQueryBuilder(sourceDatabaseName, sourceTableName string, columns *ColumnList, uniqueKey string, uniqueKeyColumns *ColumnList, includeRangeStartValues bool) (*MoveTableCopySelectQueryBuilder, error) {
sourceDatabaseName = EscapeName(sourceDatabaseName)
sourceTableName = EscapeName(sourceTableName)
sharedColumnsNames := sharedColumns.Names()
for i := range sharedColumnsNames {
sharedColumnsNames[i] = EscapeName(sharedColumnsNames[i])
columnNames := columns.Names()
for i := range columnNames {
columnNames[i] = EscapeName(columnNames[i])
}
sharedColumnsListing := strings.Join(sharedColumnsNames, ", ")
sharedColumnsListing := strings.Join(columnNames, ", ")
uniqueKey = EscapeName(uniqueKey)
var minRangeComparisonSign = GreaterThanComparisonSign
if includeRangeStartValues {
Expand Down Expand Up @@ -422,16 +422,16 @@ type MoveTableCopyInsertQueryBuilder struct {
valueListSize int
}

func NewMoveTableCopyInsertQueryBuilder(targetDatabaseName, targetTableName string, sharedColumns *ColumnList) (*MoveTableCopyInsertQueryBuilder, error) {
func NewMoveTableCopyInsertQueryBuilder(targetDatabaseName, targetTableName string, columns *ColumnList) (*MoveTableCopyInsertQueryBuilder, error) {
targetDatabaseName = EscapeName(targetDatabaseName)
targetTableName = EscapeName(targetTableName)
sharedColumnsNames := sharedColumns.Names()
for i := range sharedColumnsNames {
sharedColumnsNames[i] = EscapeName(sharedColumnsNames[i])
columnsNames := columns.Names()
for i := range columnsNames {
columnsNames[i] = EscapeName(columnsNames[i])
}
sharedColumnsListing := strings.Join(sharedColumnsNames, ", ")
valueListPlaceholder := "(" + strings.Join(buildColumnsPreparedValues(sharedColumns), ", ") + ")"
valueListSize := len(sharedColumnsNames)
sharedColumnsListing := strings.Join(columnsNames, ", ")
valueListPlaceholder := "(" + strings.Join(buildColumnsPreparedValues(columns), ", ") + ")"
valueListSize := len(columnsNames)
stmt := fmt.Sprintf(`
insert /* gh-ost %s.%s */ ignore
into
Expand Down
Loading