diff --git a/go/base/context.go b/go/base/context.go index 8a6851e23..370e8c29b 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -273,12 +273,13 @@ type MigrationContext struct { // move tables: MoveTables struct { - TableNames []string // List of table names to be moved. - TargetHost string // Target hostname for the move. This must be a primary/writable host. - TargetPort int // Target MySQL port for the move. - TargetUser string // Target username for the move. If not specified, it will default to the source user. - TargetPass string // Target password for the move. If not specified, it will default to the source password. - TargetDatabase string // Target database name for the move. If not specified, it will default to the source database name. + TableNames []string // List of table names to be moved. + TargetHost string // Target hostname for the move. This must be a primary/writable host. + TargetPort int // Target MySQL port for the move. + TargetUser string // Target username for the move. If not specified, it will default to the source user. + TargetPass string // Target password for the move. If not specified, it will default to the source password. + TargetDatabase string // Target database name for the move. If not specified, it will default to the source database name. + ConnectionConfig *mysql.ConnectionConfig } Log Logger @@ -353,6 +354,9 @@ func (mctx *MigrationContext) SetConnectionConfig(storageEngine string) error { } mctx.InspectorConnectionConfig.TransactionIsolation = transactionIsolation mctx.ApplierConnectionConfig.TransactionIsolation = transactionIsolation + if mctx.MoveTables.ConnectionConfig != nil { + mctx.MoveTables.ConnectionConfig.TransactionIsolation = transactionIsolation + } return nil } @@ -363,6 +367,9 @@ func (mctx *MigrationContext) SetConnectionCharset(charset string) { mctx.InspectorConnectionConfig.Charset = charset mctx.ApplierConnectionConfig.Charset = charset + if mctx.MoveTables.ConnectionConfig != nil { + mctx.MoveTables.ConnectionConfig.Charset = charset + } } func getSafeTableName(baseName string, suffix string) string { diff --git a/go/logic/applier.go b/go/logic/applier.go index 9d9d2ed4b..5aa82f448 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -85,6 +85,7 @@ type Applier struct { checkpointInsertQueryBuilder *sql.CheckpointInsertQueryBuilder moveTablesTargetDB *gosql.DB + moveTablesConnectionConfig *mysql.ConnectionConfig moveTablesCopySelectFirstQueryBuilder *sql.MoveTableCopySelectQueryBuilder moveTablesCopySelectNextQueryBuilder *sql.MoveTableCopySelectQueryBuilder moveTablesCopyInsertQueryBuilder *sql.MoveTableCopyInsertQueryBuilder @@ -96,6 +97,8 @@ func NewApplier(migrationContext *base.MigrationContext) *Applier { migrationContext: migrationContext, finishedMigrating: 0, name: "applier", + + moveTablesConnectionConfig: migrationContext.MoveTables.ConnectionConfig, } } @@ -146,22 +149,38 @@ func (apl *Applier) InitDBConnections() (err error) { if err := apl.readTableColumns(); err != nil { return err } + if apl.moveTablesConnectionConfig != nil { + moveTablesURI := apl.moveTablesConnectionConfig.GetDBUri(apl.migrationContext.MoveTables.TargetDatabase) + "&multiStatements=true" + if apl.moveTablesTargetDB, _, err = mysql.GetDB(apl.migrationContext.Uuid, moveTablesURI); err != nil { + return err + } + if _, err := base.ValidateConnection(apl.moveTablesTargetDB, apl.moveTablesConnectionConfig, apl.migrationContext, apl.name); err != nil { + return err + } + } apl.migrationContext.Log.Infof("Applier initiated on %+v, version %+v", apl.connectionConfig.ImpliedKey, apl.migrationContext.ApplierMySQLVersion) return nil } func (apl *Applier) prepareQueries() (err error) { + targetDatabaseName := apl.migrationContext.DatabaseName + targetTableName := apl.migrationContext.GetGhostTableName() + if apl.migrationContext.IsMoveTablesMode() { + targetDatabaseName = apl.migrationContext.MoveTables.TargetDatabase + targetTableName = apl.migrationContext.OriginalTableName + } + if apl.dmlDeleteQueryBuilder, err = sql.NewDMLDeleteQueryBuilder( - apl.migrationContext.DatabaseName, - apl.migrationContext.GetGhostTableName(), + targetDatabaseName, + targetTableName, apl.migrationContext.OriginalTableColumns, &apl.migrationContext.UniqueKey.Columns, ); err != nil { return err } if apl.dmlInsertQueryBuilder, err = sql.NewDMLInsertQueryBuilder( - apl.migrationContext.DatabaseName, - apl.migrationContext.GetGhostTableName(), + targetDatabaseName, + targetTableName, apl.migrationContext.OriginalTableColumns, apl.migrationContext.SharedColumns, apl.migrationContext.MappedSharedColumns, @@ -169,8 +188,8 @@ func (apl *Applier) prepareQueries() (err error) { return err } if apl.dmlUpdateQueryBuilder, err = sql.NewDMLUpdateQueryBuilder( - apl.migrationContext.DatabaseName, - apl.migrationContext.GetGhostTableName(), + targetDatabaseName, + targetTableName, apl.migrationContext.OriginalTableColumns, apl.migrationContext.SharedColumns, apl.migrationContext.MappedSharedColumns, @@ -191,7 +210,7 @@ func (apl *Applier) prepareQueries() (err error) { if apl.moveTablesCopySelectFirstQueryBuilder, err = sql.NewMoveTableCopySelectQueryBuilder( apl.migrationContext.DatabaseName, apl.migrationContext.OriginalTableName, - apl.migrationContext.SharedColumns, + apl.migrationContext.OriginalTableColumns, apl.migrationContext.UniqueKey.Name, &apl.migrationContext.UniqueKey.Columns, true, // <-- include start range values for first select query @@ -201,7 +220,7 @@ func (apl *Applier) prepareQueries() (err error) { if apl.moveTablesCopySelectNextQueryBuilder, err = sql.NewMoveTableCopySelectQueryBuilder( apl.migrationContext.DatabaseName, apl.migrationContext.OriginalTableName, - apl.migrationContext.SharedColumns, + apl.migrationContext.OriginalTableColumns, apl.migrationContext.UniqueKey.Name, &apl.migrationContext.UniqueKey.Columns, false, @@ -209,9 +228,9 @@ func (apl *Applier) prepareQueries() (err error) { return err } if apl.moveTablesCopyInsertQueryBuilder, err = sql.NewMoveTableCopyInsertQueryBuilder( - apl.migrationContext.MoveTables.TargetDatabase, - apl.migrationContext.OriginalTableName, - apl.migrationContext.SharedColumns, + targetDatabaseName, + targetTableName, + apl.migrationContext.OriginalTableColumns, ); err != nil { return err } @@ -1781,7 +1800,11 @@ func (apl *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) e ctx := context.Background() err := func() error { - conn, err := apl.db.Conn(ctx) + db := apl.db + if apl.migrationContext.IsMoveTablesMode() { + db = apl.moveTablesTargetDB + } + conn, err := db.Conn(ctx) if err != nil { return err } @@ -1885,6 +1908,9 @@ func (apl *Applier) Teardown() { apl.migrationContext.Log.Debugf("Tearing down...") apl.db.Close() apl.singletonDB.Close() + if apl.moveTablesTargetDB != nil { + apl.moveTablesTargetDB.Close() + } atomic.StoreInt64(&apl.finishedMigrating, 1) } diff --git a/go/logic/applier_test.go b/go/logic/applier_test.go index 72c42c75f..410e3456b 100644 --- a/go/logic/applier_test.go +++ b/go/logic/applier_test.go @@ -332,6 +332,8 @@ func (suite *ApplierTestSuite) TearDownTest() { suite.Require().NoError(err) _, err = suite.db.ExecContext(ctx, "DROP TABLE IF EXISTS "+getTestGhostTableName()) suite.Require().NoError(err) + _, err = suite.otherDB.ExecContext(ctx, "DROP TABLE IF EXISTS "+getTestOtherTableName()) + suite.Require().NoError(err) } func (suite *ApplierTestSuite) TestInitDBConnections() { @@ -1561,6 +1563,72 @@ func (suite *ApplierTestSuite) TestMultipleDMLEventsInBatch() { // Critically: id=2 (bob@example.com) is NOT present, proving event #3 was rolled back } +func (suite *ApplierTestSuite) TestApplyDMLEventQueriesMoveTablesMode() { + ctx := context.Background() + var err error + + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT, item_id INT);", getTestTableName())) + suite.Require().NoError(err) + _, err = suite.otherDB.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT, item_id INT);", getTestOtherTableName())) + suite.Require().NoError(err) + + connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer) + suite.Require().NoError(err) + + migrationContext := newTestMigrationContext() + migrationContext.ApplierConnectionConfig = connectionConfig + migrationContext.MoveTables.ConnectionConfig = connectionConfig + migrationContext.SetConnectionConfig("innodb") + + migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id", "item_id"}) + migrationContext.SharedColumns = sql.NewColumnList([]string{"id", "item_id"}) + migrationContext.MappedSharedColumns = sql.NewColumnList([]string{"id", "item_id"}) + migrationContext.UniqueKey = &sql.UniqueKey{ + Name: "primary_key", + Columns: *sql.NewColumnList([]string{"id"}), + } + migrationContext.MoveTables.TableNames = []string{testMysqlTableName} + migrationContext.MoveTables.TargetDatabase = testMysqlDatabaseOther + + applier := NewApplier(migrationContext) + suite.Require().NoError(applier.prepareQueries()) + defer applier.Teardown() + + err = applier.InitDBConnections() + suite.Require().NoError(err) + + dmlEvents := []*binlog.BinlogDMLEvent{ + { + DatabaseName: testMysqlDatabase, + TableName: testMysqlTableName, + DML: binlog.InsertDML, + NewColumnValues: sql.ToColumnValues([]interface{}{123456, 42}), + }, + } + err = applier.ApplyDMLEventQueries(dmlEvents) + suite.Require().NoError(err) + + // Check that the row was inserted into the ghost table via moveTablesTargetDB + rows, err := suite.otherDB.Query("SELECT * FROM " + getTestOtherTableName()) + suite.Require().NoError(err) + defer rows.Close() + + var count, id, item_id int + for rows.Next() { + err = rows.Scan(&id, &item_id) + suite.Require().NoError(err) + count += 1 + } + suite.Require().NoError(rows.Err()) + + suite.Require().Equal(1, count) + suite.Require().Equal(123456, id) + suite.Require().Equal(42, item_id) + + suite.Require().Equal(int64(1), migrationContext.TotalDMLEventsApplied) + suite.Require().Equal(int64(0), migrationContext.RowsDeltaEstimate) +} + func (suite *ApplierTestSuite) TestApplyIterationMoveTableCopyQueries() { ctx := context.Background() var err error @@ -1577,6 +1645,7 @@ func (suite *ApplierTestSuite) TestApplyIterationMoveTableCopyQueries() { migrationContext := newTestMigrationContext() migrationContext.ApplierConnectionConfig = connectionConfig + migrationContext.MoveTables.ConnectionConfig = connectionConfig migrationContext.SetConnectionConfig("innodb") migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id", "name", "created_at"}) migrationContext.SharedColumns = sql.NewColumnList([]string{"id", "name", "created_at"}) @@ -1595,8 +1664,6 @@ func (suite *ApplierTestSuite) TestApplyIterationMoveTableCopyQueries() { err = applier.InitDBConnections() suite.Require().NoError(err) - applier.moveTablesTargetDB = suite.otherDB - err = applier.CreateChangelogTable() suite.Require().NoError(err) diff --git a/go/sql/builder.go b/go/sql/builder.go index 79c19de56..de60b25e3 100644 --- a/go/sql/builder.go +++ b/go/sql/builder.go @@ -336,14 +336,14 @@ type MoveTableCopySelectQueryBuilder struct { argsCount int } -func NewMoveTableCopySelectQueryBuilder(sourceDatabaseName, sourceTableName string, sharedColumns *ColumnList, uniqueKey string, uniqueKeyColumns *ColumnList, includeRangeStartValues bool) (*MoveTableCopySelectQueryBuilder, error) { +func NewMoveTableCopySelectQueryBuilder(sourceDatabaseName, sourceTableName string, columns *ColumnList, uniqueKey string, uniqueKeyColumns *ColumnList, includeRangeStartValues bool) (*MoveTableCopySelectQueryBuilder, error) { sourceDatabaseName = EscapeName(sourceDatabaseName) sourceTableName = EscapeName(sourceTableName) - sharedColumnsNames := sharedColumns.Names() - for i := range sharedColumnsNames { - sharedColumnsNames[i] = EscapeName(sharedColumnsNames[i]) + columnNames := columns.Names() + for i := range columnNames { + columnNames[i] = EscapeName(columnNames[i]) } - sharedColumnsListing := strings.Join(sharedColumnsNames, ", ") + sharedColumnsListing := strings.Join(columnNames, ", ") uniqueKey = EscapeName(uniqueKey) var minRangeComparisonSign = GreaterThanComparisonSign if includeRangeStartValues { @@ -422,16 +422,16 @@ type MoveTableCopyInsertQueryBuilder struct { valueListSize int } -func NewMoveTableCopyInsertQueryBuilder(targetDatabaseName, targetTableName string, sharedColumns *ColumnList) (*MoveTableCopyInsertQueryBuilder, error) { +func NewMoveTableCopyInsertQueryBuilder(targetDatabaseName, targetTableName string, columns *ColumnList) (*MoveTableCopyInsertQueryBuilder, error) { targetDatabaseName = EscapeName(targetDatabaseName) targetTableName = EscapeName(targetTableName) - sharedColumnsNames := sharedColumns.Names() - for i := range sharedColumnsNames { - sharedColumnsNames[i] = EscapeName(sharedColumnsNames[i]) + columnsNames := columns.Names() + for i := range columnsNames { + columnsNames[i] = EscapeName(columnsNames[i]) } - sharedColumnsListing := strings.Join(sharedColumnsNames, ", ") - valueListPlaceholder := "(" + strings.Join(buildColumnsPreparedValues(sharedColumns), ", ") + ")" - valueListSize := len(sharedColumnsNames) + sharedColumnsListing := strings.Join(columnsNames, ", ") + valueListPlaceholder := "(" + strings.Join(buildColumnsPreparedValues(columns), ", ") + ")" + valueListSize := len(columnsNames) stmt := fmt.Sprintf(` insert /* gh-ost %s.%s */ ignore into