diff --git a/doc/command-line-flags.md b/doc/command-line-flags.md index 2012c8c4c..2f7c5032a 100644 --- a/doc/command-line-flags.md +++ b/doc/command-line-flags.md @@ -78,6 +78,16 @@ See also: [`resuming-migrations`](resume.md) `--checkpoint-seconds` specifies the seconds between checkpoints. Default is 300. +### chunk-concurrent-size + +`--chunk-concurrent-size=1`, the number of goroutines to execute chunk-copy operations concurrently in each copy time slot. Default `1` (sequential). Minimum `1`. + +When set to a value greater than 1, multiple chunks are calculated and copied in parallel within each write-function invocation. This can significantly speed up row-copy on large tables when MySQL can handle concurrent writes to the ghost table. + +Each concurrent chunk calculates its own non-overlapping key range under a serialization lock, so there is no risk of duplicate or overlapping copies. + +Note: concurrency multiplies write pressure per time slot. Throttling (`--max-load`, `--nice-ratio`) applies per batch, not per chunk. Start with small values (2-8) and monitor replication lag. + ### conf `--conf=/path/to/my.cnf`: file where credentials are specified. Should be in (or contain) the following format: diff --git a/go/base/context.go b/go/base/context.go index 617e5bb13..521cd0454 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -26,6 +26,16 @@ import ( "github.com/go-ini/ini" ) +// IterationRangeValues holds the range boundaries for a single chunk-copy iteration. +// Used by concurrent row-copy to pass isolated range values to each worker goroutine. +type IterationRangeValues struct { + Min *sql.ColumnValues + Max *sql.ColumnValues + Size int64 + IncludeMinValues bool + HasFurtherRange bool +} + // RowsEstimateMethod is the type of row number estimation type RowsEstimateMethod string @@ -130,6 +140,7 @@ type MigrationContext struct { HeartbeatIntervalMilliseconds int64 defaultNumRetries int64 ChunkSize int64 + ChunkConcurrentSize int64 niceRatio float64 MaxLagMillisecondsThrottleThreshold int64 throttleControlReplicaKeys *mysql.InstanceKeyMap @@ -237,27 +248,28 @@ type MigrationContext struct { AbortError error abortMutex *sync.Mutex - OriginalTableColumnsOnApplier *sql.ColumnList - OriginalTableColumns *sql.ColumnList - OriginalTableVirtualColumns *sql.ColumnList - OriginalTableUniqueKeys [](*sql.UniqueKey) - OriginalTableAutoIncrement uint64 - GhostTableColumns *sql.ColumnList - GhostTableVirtualColumns *sql.ColumnList - GhostTableUniqueKeys [](*sql.UniqueKey) - UniqueKey *sql.UniqueKey - SharedColumns *sql.ColumnList - ColumnRenameMap map[string]string - DroppedColumnsMap map[string]bool - MappedSharedColumns *sql.ColumnList - MigrationLastInsertSQLWarnings []string - MigrationRangeMinValues *sql.ColumnValues - MigrationRangeMaxValues *sql.ColumnValues - Iteration int64 - MigrationIterationRangeMinValues *sql.ColumnValues - MigrationIterationRangeMaxValues *sql.ColumnValues - InitialStreamerCoords mysql.BinlogCoordinates - ForceTmpTableName string + OriginalTableColumnsOnApplier *sql.ColumnList + OriginalTableColumns *sql.ColumnList + OriginalTableVirtualColumns *sql.ColumnList + OriginalTableUniqueKeys [](*sql.UniqueKey) + OriginalTableAutoIncrement uint64 + GhostTableColumns *sql.ColumnList + GhostTableVirtualColumns *sql.ColumnList + GhostTableUniqueKeys [](*sql.UniqueKey) + UniqueKey *sql.UniqueKey + SharedColumns *sql.ColumnList + ColumnRenameMap map[string]string + DroppedColumnsMap map[string]bool + MappedSharedColumns *sql.ColumnList + MigrationLastInsertSQLWarnings []string + MigrationRangeMinValues *sql.ColumnValues + MigrationRangeMaxValues *sql.ColumnValues + Iteration int64 + MigrationIterationRangeMinValues *sql.ColumnValues + MigrationIterationRangeMaxValues *sql.ColumnValues + CalculateNextIterationRangeEndValuesLock *sync.Mutex + InitialStreamerCoords mysql.BinlogCoordinates + ForceTmpTableName string IncludeTriggers bool RemoveTriggerSuffix bool @@ -307,29 +319,31 @@ type ContextConfig struct { func NewMigrationContext() *MigrationContext { ctx, cancelFunc := context.WithCancel(context.Background()) return &MigrationContext{ - Uuid: uuid.NewString(), - defaultNumRetries: 60, - ChunkSize: 1000, - InspectorConnectionConfig: mysql.NewConnectionConfig(), - ApplierConnectionConfig: mysql.NewConnectionConfig(), - MaxLagMillisecondsThrottleThreshold: 1500, - CutOverLockTimeoutSeconds: 3, - DMLBatchSize: 10, - etaNanoseonds: ETAUnknown, - maxLoad: NewLoadMap(), - criticalLoad: NewLoadMap(), - throttleMutex: &sync.Mutex{}, - throttleHTTPMutex: &sync.Mutex{}, - throttleControlReplicaKeys: mysql.NewInstanceKeyMap(), - configMutex: &sync.Mutex{}, - pointOfInterestTimeMutex: &sync.Mutex{}, - lastHeartbeatOnChangelogMutex: &sync.Mutex{}, - ColumnRenameMap: make(map[string]string), - PanicAbort: make(chan error), - ctx: ctx, - cancelFunc: cancelFunc, - abortMutex: &sync.Mutex{}, - Log: NewDefaultLogger(), + Uuid: uuid.NewString(), + defaultNumRetries: 60, + ChunkSize: 1000, + ChunkConcurrentSize: 1, + InspectorConnectionConfig: mysql.NewConnectionConfig(), + ApplierConnectionConfig: mysql.NewConnectionConfig(), + MaxLagMillisecondsThrottleThreshold: 1500, + CutOverLockTimeoutSeconds: 3, + DMLBatchSize: 10, + etaNanoseonds: ETAUnknown, + maxLoad: NewLoadMap(), + criticalLoad: NewLoadMap(), + throttleMutex: &sync.Mutex{}, + throttleHTTPMutex: &sync.Mutex{}, + throttleControlReplicaKeys: mysql.NewInstanceKeyMap(), + configMutex: &sync.Mutex{}, + pointOfInterestTimeMutex: &sync.Mutex{}, + lastHeartbeatOnChangelogMutex: &sync.Mutex{}, + CalculateNextIterationRangeEndValuesLock: &sync.Mutex{}, + ColumnRenameMap: make(map[string]string), + PanicAbort: make(chan error), + ctx: ctx, + cancelFunc: cancelFunc, + abortMutex: &sync.Mutex{}, + Log: NewDefaultLogger(), } } @@ -690,6 +704,13 @@ func (mctx *MigrationContext) SetChunkSize(chunkSize int64) { atomic.StoreInt64(&mctx.ChunkSize, chunkSize) } +func (mctx *MigrationContext) SetChunkConcurrentSize(chunkConcurrentSize int64) { + if chunkConcurrentSize < 1 { + chunkConcurrentSize = 1 + } + atomic.StoreInt64(&mctx.ChunkConcurrentSize, chunkConcurrentSize) +} + func (mctx *MigrationContext) SetDMLBatchSize(batchSize int64) { if batchSize < 1 { batchSize = 1 diff --git a/go/cmd/gh-ost/main.go b/go/cmd/gh-ost/main.go index 567137fd5..fb15033f4 100644 --- a/go/cmd/gh-ost/main.go +++ b/go/cmd/gh-ost/main.go @@ -107,6 +107,7 @@ func main() { flag.BoolVar(&migrationContext.CutOverExponentialBackoff, "cut-over-exponential-backoff", false, "Wait exponentially longer intervals between failed cut-over attempts. Wait intervals obey a maximum configurable with 'exponential-backoff-max-interval').") exponentialBackoffMaxInterval := flag.Int64("exponential-backoff-max-interval", 64, "Maximum number of seconds to wait between attempts when performing various operations with exponential backoff.") chunkSize := flag.Int64("chunk-size", 1000, "amount of rows to handle in each iteration (allowed range: 10-100,000)") + chunkConcurrentSize := flag.Int64("chunk-concurrent-size", 1, "number of goroutines to execute chunks concurrently in each copy time slot (range 1-100)") dmlBatchSize := flag.Int64("dml-batch-size", 10, "batch size for DML events to apply in a single transaction (range 1-1000)") defaultRetries := flag.Int64("default-retries", 60, "Default number of retries for various operations before panicking") flag.BoolVar(&migrationContext.PanicOnWarnings, "panic-on-warnings", false, "Panic when SQL warnings are encountered when copying a batch indicating data loss") @@ -355,6 +356,7 @@ func main() { migrationContext.SetHeartbeatIntervalMilliseconds(*heartbeatIntervalMillis) migrationContext.SetNiceRatio(*niceRatio) migrationContext.SetChunkSize(*chunkSize) + migrationContext.SetChunkConcurrentSize(*chunkConcurrentSize) migrationContext.SetDMLBatchSize(*dmlBatchSize) migrationContext.SetMaxLagMillisecondsThrottleThreshold(*maxLagMillis) migrationContext.SetThrottleQuery(*throttleQuery) diff --git a/go/logic/applier.go b/go/logic/applier.go index b49e131b8..541677a76 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -115,6 +115,11 @@ func (apl *Applier) InitDBConnections() (err error) { if apl.db, _, err = mysql.GetDB(apl.migrationContext.Uuid, uriWithMulti); err != nil { return err } + concurrentSize := atomic.LoadInt64(&apl.migrationContext.ChunkConcurrentSize) + if concurrentSize > int64(mysql.MaxDBPoolConnections) { + apl.db.SetMaxOpenConns(int(concurrentSize) + mysql.MaxDBPoolConnections) + apl.db.SetMaxIdleConns(int(concurrentSize) + mysql.MaxDBPoolConnections) + } singletonApplierUri := fmt.Sprintf("%s&timeout=0", applierUri) if apl.singletonDB, _, err = mysql.GetDB(apl.migrationContext.Uuid, singletonApplierUri); err != nil { return err @@ -870,10 +875,40 @@ func (apl *Applier) ReadMigrationRangeValues() error { } // CalculateNextIterationRangeEndValues reads the next-iteration-range-end unique key values, -// which will be used for copying the next chunk of rows. Ir returns "false" if there is -// no further chunk to work through, i.e. we're past the last chunk and are done with -// iterating the range (and thus done with copying row chunks) -func (apl *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange bool, err error) { +// which will be used for copying the next chunk of rows. It returns an IterationRangeValues +// struct with HasFurtherRange=false if there is no further chunk to work through. +// Thread-safe: uses a mutex to serialize access for concurrent row-copy. +// When advanceCursor is true, the function determines min from MigrationIterationRangeMaxValues +// (for concurrent mode where each goroutine advances the cursor). +// When advanceCursor is false, min is read from MigrationIterationRangeMinValues (pre-set by +// SetNextIterationRangeMinValues for single-threaded retry compatibility). +func (apl *Applier) CalculateNextIterationRangeEndValues(advanceCursor bool) (values *base.IterationRangeValues, err error) { + apl.migrationContext.CalculateNextIterationRangeEndValuesLock.Lock() + defer apl.migrationContext.CalculateNextIterationRangeEndValuesLock.Unlock() + + result := &base.IterationRangeValues{ + Size: atomic.LoadInt64(&apl.migrationContext.ChunkSize), + } + + if advanceCursor { + // Concurrent mode: advance min from current max cursor + result.Min = apl.migrationContext.MigrationIterationRangeMaxValues + if result.Min == nil { + result.Min = apl.migrationContext.MigrationRangeMinValues + result.IncludeMinValues = true + } + } else { + // Single-threaded mode: min was pre-set by SetNextIterationRangeMinValues + result.Min = apl.migrationContext.MigrationIterationRangeMinValues + if result.Min == nil { + result.Min = apl.migrationContext.MigrationRangeMinValues + } + // First iteration: include the minimum values. Use Iteration counter (not cursor state) + // because cursor is mutated on first calc success, but Iteration only advances after + // successful insert — so on retry of the first chunk, this still returns true. + result.IncludeMinValues = (apl.migrationContext.GetIteration() == 0) + } + for i := 0; i < 2; i++ { buildFunc := sql.BuildUniqueKeyRangeEndPreparedQueryViaOffset if i == 1 { @@ -883,46 +918,49 @@ func (apl *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange bool apl.migrationContext.DatabaseName, apl.migrationContext.OriginalTableName, &apl.migrationContext.UniqueKey.Columns, - apl.migrationContext.MigrationIterationRangeMinValues.AbstractValues(), + result.Min.AbstractValues(), apl.migrationContext.MigrationRangeMaxValues.AbstractValues(), - atomic.LoadInt64(&apl.migrationContext.ChunkSize), - apl.migrationContext.GetIteration() == 0, + result.Size, + result.IncludeMinValues, fmt.Sprintf("iteration:%d", apl.migrationContext.GetIteration()), ) if err != nil { - return hasFurtherRange, err + return result, err } rows, err := apl.db.Query(query, explodedArgs...) if err != nil { - return hasFurtherRange, err + return result, err } defer rows.Close() iterationRangeMaxValues := sql.NewColumnValues(apl.migrationContext.UniqueKey.Len()) for rows.Next() { if err = rows.Scan(iterationRangeMaxValues.ValuesPointers...); err != nil { - return hasFurtherRange, err + return result, err } - hasFurtherRange = true + result.HasFurtherRange = true } if err = rows.Err(); err != nil { - return hasFurtherRange, err + return result, err } - if hasFurtherRange { - apl.migrationContext.MigrationIterationRangeMaxValues = iterationRangeMaxValues - return hasFurtherRange, nil + if result.HasFurtherRange { + result.Max = iterationRangeMaxValues + // Advance global cursor + apl.migrationContext.MigrationIterationRangeMinValues = result.Min + apl.migrationContext.MigrationIterationRangeMaxValues = result.Max + return result, nil } } apl.migrationContext.Log.Debugf("Iteration complete: no further range to iterate") - return hasFurtherRange, nil + return result, nil } // ApplyIterationInsertQuery issues a chunk-INSERT query on the ghost table. It is where // data actually gets copied from original table. -func (apl *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected int64, duration time.Duration, err error) { +func (apl *Applier) ApplyIterationInsertQuery(iterationRangeValues *base.IterationRangeValues) (chunkSize int64, rowsAffected int64, duration time.Duration, warnings []string, err error) { startTime := time.Now() - chunkSize = atomic.LoadInt64(&apl.migrationContext.ChunkSize) + chunkSize = iterationRangeValues.Size query, explodedArgs, err := sql.BuildRangeInsertPreparedQuery( apl.migrationContext.DatabaseName, @@ -932,21 +970,21 @@ func (apl *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected i apl.migrationContext.MappedSharedColumns.Names(), apl.migrationContext.UniqueKey.Name, &apl.migrationContext.UniqueKey.Columns, - apl.migrationContext.MigrationIterationRangeMinValues.AbstractValues(), - apl.migrationContext.MigrationIterationRangeMaxValues.AbstractValues(), - apl.migrationContext.GetIteration() == 0, + iterationRangeValues.Min.AbstractValues(), + iterationRangeValues.Max.AbstractValues(), + iterationRangeValues.IncludeMinValues, apl.migrationContext.IsTransactionalTable(), // TODO: Don't hardcode this strings.HasPrefix(apl.migrationContext.ApplierMySQLVersion, "8."), ) if err != nil { - return chunkSize, rowsAffected, duration, err + return chunkSize, rowsAffected, duration, nil, err } - sqlResult, err := func() (gosql.Result, error) { + sqlResult, sqlWarnings, err := func() (gosql.Result, []string, error) { tx, err := apl.db.Begin() if err != nil { - return nil, err + return nil, nil, err } defer tx.Rollback() @@ -954,30 +992,30 @@ func (apl *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected i sessionQuery = fmt.Sprintf("%s, %s", sessionQuery, apl.generateSqlModeQuery()) if _, err := tx.Exec(sessionQuery); err != nil { - return nil, err + return nil, nil, err } result, err := tx.Exec(query, explodedArgs...) if err != nil { - return nil, err + return nil, nil, err } + var collectedWarnings []string if apl.migrationContext.PanicOnWarnings { rows, err := tx.Query("SHOW WARNINGS") if err != nil { - return nil, err + return nil, nil, err } defer rows.Close() if err = rows.Err(); err != nil { - return nil, err + return nil, nil, err } // Compile regex once before loop to avoid performance penalty and handle errors properly migrationKeyRegex, err := apl.compileMigrationKeyWarningRegex() if err != nil { - return nil, err + return nil, nil, err } - var sqlWarnings []string for rows.Next() { var level, message string var code int @@ -988,29 +1026,32 @@ func (apl *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected i if strings.Contains(message, "Duplicate entry") && migrationKeyRegex.MatchString(message) { continue } - sqlWarnings = append(sqlWarnings, fmt.Sprintf("%s: %s (%d)", level, message, code)) + collectedWarnings = append(collectedWarnings, fmt.Sprintf("%s: %s (%d)", level, message, code)) + } + if err := rows.Err(); err != nil { + return nil, nil, err } - apl.migrationContext.MigrationLastInsertSQLWarnings = sqlWarnings } if err := tx.Commit(); err != nil { - return nil, err + return nil, nil, err } - return result, nil + return result, collectedWarnings, nil }() if err != nil { - return chunkSize, rowsAffected, duration, err + return chunkSize, rowsAffected, duration, nil, err } rowsAffected, _ = sqlResult.RowsAffected() duration = time.Since(startTime) + warnings = sqlWarnings apl.migrationContext.Log.Debugf( "Issued INSERT on range: [%s]..[%s]; iteration: %d; chunk-size: %d", - apl.migrationContext.MigrationIterationRangeMinValues, - apl.migrationContext.MigrationIterationRangeMaxValues, + iterationRangeValues.Min, + iterationRangeValues.Max, apl.migrationContext.GetIteration(), chunkSize) - return chunkSize, rowsAffected, duration, nil + return chunkSize, rowsAffected, duration, warnings, nil } // LockOriginalTable places a write lock on the original table diff --git a/go/logic/applier_test.go b/go/logic/applier_test.go index 6d7ba42f4..17b4bb483 100644 --- a/go/logic/applier_test.go +++ b/go/logic/applier_test.go @@ -606,17 +606,17 @@ func (suite *ApplierTestSuite) TestPanicOnWarningsInApplyIterationInsertQuerySuc suite.Require().NoError(err) migrationContext.SetNextIterationRangeMinValues() - hasFurtherRange, err := applier.CalculateNextIterationRangeEndValues() + iterationRange, err := applier.CalculateNextIterationRangeEndValues(false) suite.Require().NoError(err) - suite.Require().True(hasFurtherRange) + suite.Require().True(iterationRange.HasFurtherRange) - _, rowsAffected, _, err := applier.ApplyIterationInsertQuery() + _, rowsAffected, _, sqlWarnings, err := applier.ApplyIterationInsertQuery(iterationRange) suite.Require().NoError(err) suite.Require().Equal(int64(0), rowsAffected) // Ensure Duplicate entry '42' for key '_testing_gho.item_id' is ignored correctly - suite.Require().Empty(applier.migrationContext.MigrationLastInsertSQLWarnings) + suite.Require().Empty(sqlWarnings) // Check that the row was inserted rows, err := suite.db.Query("SELECT * FROM " + getTestGhostTableName()) @@ -686,17 +686,17 @@ func (suite *ApplierTestSuite) TestPanicOnWarningsInApplyIterationInsertQueryFai suite.Require().NoError(err) migrationContext.SetNextIterationRangeMinValues() - hasFurtherRange, err := applier.CalculateNextIterationRangeEndValues() + iterationRange, err := applier.CalculateNextIterationRangeEndValues(false) suite.Require().NoError(err) - suite.Require().True(hasFurtherRange) + suite.Require().True(iterationRange.HasFurtherRange) - _, rowsAffected, _, err := applier.ApplyIterationInsertQuery() + _, rowsAffected, _, sqlWarnings, err := applier.ApplyIterationInsertQuery(iterationRange) suite.Equal(int64(1), rowsAffected) suite.Require().NoError(err) // Verify the warning was recorded and will cause the migrator to panic - suite.Require().NotEmpty(applier.migrationContext.MigrationLastInsertSQLWarnings) - suite.Require().Contains(applier.migrationContext.MigrationLastInsertSQLWarnings[0], "Warning: Data truncated for column 'name' at row 1") + suite.Require().NotEmpty(sqlWarnings) + suite.Require().Contains(sqlWarnings[0], "Warning: Data truncated for column 'name' at row 1") } func (suite *ApplierTestSuite) TestWriteCheckpoint() { diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 90fa8c509..8f109859d 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -20,6 +20,8 @@ import ( "github.com/github/gh-ost/go/binlog" "github.com/github/gh-ost/go/mysql" "github.com/github/gh-ost/go/sql" + + "golang.org/x/sync/errgroup" ) var ( @@ -1549,7 +1551,8 @@ func (mgtr *Migrator) initiateApplier() error { } // iterateChunks iterates the existing table rows, and generates a copy task of -// a chunk of rows onto the ghost table. +// a chunk of rows onto the ghost table. Supports concurrent chunk copying via +// --chunk-concurrent-size. func (mgtr *Migrator) iterateChunks() error { terminateRowIteration := func(err error) error { _ = base.SendWithContext(mgtr.migrationContext.GetContext(), mgtr.rowCopyComplete, err) @@ -1576,59 +1579,113 @@ func (mgtr *Migrator) iterateChunks() error { return nil } copyRowsFunc := func() error { - mgtr.migrationContext.SetNextIterationRangeMinValues() - // Copy task: - applyCopyRowsFunc := func() error { - if atomic.LoadInt64(&mgtr.rowCopyCompleteFlag) == 1 || atomic.LoadInt64(&hasNoFurtherRangeFlag) == 1 { - // Done. - // There's another such check down the line - return nil - } + concurrentSize := atomic.LoadInt64(&mgtr.migrationContext.ChunkConcurrentSize) + if concurrentSize < 1 { + concurrentSize = 1 + } - // When hasFurtherRange is false, original table might be write locked and CalculateNextIterationRangeEndValues would hangs forever - hasFurtherRange, err := mgtr.applier.CalculateNextIterationRangeEndValues() - if err != nil { - return err // wrapping call will retry - } - if !hasFurtherRange { - atomic.StoreInt64(&hasNoFurtherRangeFlag, 1) - return terminateRowIteration(nil) - } - if atomic.LoadInt64(&mgtr.rowCopyCompleteFlag) == 1 { - // No need for more writes. - // This is the de-facto place where we avoid writing in the event of completed cut-over. - // There could _still_ be a race condition, but that's as close as we can get. - // What about the race condition? Well, there's actually no data integrity issue. - // when rowCopyCompleteFlag==1 that means **guaranteed** all necessary rows have been copied. - // But some are still then collected at the binary log, and these are the ones we're trying to - // not apply here. If the race condition wins over us, then we just attempt to apply onto the - // _ghost_ table, which no longer exists. So, bothering error messages and all, but no damage. - return nil - } - _, rowsAffected, _, err := mgtr.applier.ApplyIterationInsertQuery() - if err != nil { - return err // wrapping call will retry - } + g, gctx := errgroup.WithContext(mgtr.migrationContext.GetContext()) + g.SetLimit(int(concurrentSize)) - if mgtr.migrationContext.PanicOnWarnings { - if len(mgtr.migrationContext.MigrationLastInsertSQLWarnings) > 0 { - for _, warning := range mgtr.migrationContext.MigrationLastInsertSQLWarnings { - mgtr.migrationContext.Log.Infof("ApplyIterationInsertQuery has SQL warnings! %s", warning) - } - joinedWarnings := strings.Join(mgtr.migrationContext.MigrationLastInsertSQLWarnings, "; ") - return terminateRowIteration(fmt.Errorf("ApplyIterationInsertQuery failed because of SQL warnings: [%s]", joinedWarnings)) + for i := int64(0); i < concurrentSize; i++ { + g.Go(func() error { + if gctx.Err() != nil { + return gctx.Err() + } + if atomic.LoadInt64(&mgtr.rowCopyCompleteFlag) == 1 || atomic.LoadInt64(&hasNoFurtherRangeFlag) == 1 { + return nil } - } - atomic.AddInt64(&mgtr.migrationContext.TotalRowsCopied, rowsAffected) - atomic.AddInt64(&mgtr.migrationContext.Iteration, 1) - return nil + if concurrentSize == 1 { + // Single-threaded path: matches master behavior exactly. + // Min is fixed before retry loop; range calc + insert are retried together. + // This allows hook-based chunk size reduction to take effect on retry. + mgtr.migrationContext.SetNextIterationRangeMinValues() + + applyCopyRowsFunc := func() error { + if atomic.LoadInt64(&mgtr.rowCopyCompleteFlag) == 1 || atomic.LoadInt64(&hasNoFurtherRangeFlag) == 1 { + return nil + } + + iterationRangeValues, err := mgtr.applier.CalculateNextIterationRangeEndValues(false) + if err != nil { + return err + } + if !iterationRangeValues.HasFurtherRange { + atomic.StoreInt64(&hasNoFurtherRangeFlag, 1) + return nil + } + if atomic.LoadInt64(&mgtr.rowCopyCompleteFlag) == 1 { + return nil + } + + _, rowsAffected, _, sqlWarnings, err := mgtr.applier.ApplyIterationInsertQuery(iterationRangeValues) + if err != nil { + return err + } + + if mgtr.migrationContext.PanicOnWarnings && len(sqlWarnings) > 0 { + for _, warning := range sqlWarnings { + mgtr.migrationContext.Log.Infof("ApplyIterationInsertQuery has SQL warnings! %s", warning) + } + joinedWarnings := strings.Join(sqlWarnings, "; ") + return fmt.Errorf("ApplyIterationInsertQuery failed because of SQL warnings: [%s]", joinedWarnings) + } + + atomic.AddInt64(&mgtr.migrationContext.TotalRowsCopied, rowsAffected) + atomic.AddInt64(&mgtr.migrationContext.Iteration, 1) + return nil + } + if err := mgtr.retryBatchCopyWithHooks(applyCopyRowsFunc); err != nil { //nolint:contextcheck + return err + } + } else { + // Concurrent path: range calculation is serialized under mutex upfront. + // Each goroutine gets its own range; retries apply to the INSERT only. + iterationRangeValues, err := mgtr.applier.CalculateNextIterationRangeEndValues(true) + if err != nil { + return err + } + if !iterationRangeValues.HasFurtherRange { + atomic.StoreInt64(&hasNoFurtherRangeFlag, 1) + return nil + } + + applyCopyRowsFunc := func() error { + if atomic.LoadInt64(&mgtr.rowCopyCompleteFlag) == 1 { + return nil + } + _, rowsAffected, _, sqlWarnings, err := mgtr.applier.ApplyIterationInsertQuery(iterationRangeValues) + if err != nil { + return err + } + + if mgtr.migrationContext.PanicOnWarnings && len(sqlWarnings) > 0 { + for _, warning := range sqlWarnings { + mgtr.migrationContext.Log.Infof("ApplyIterationInsertQuery has SQL warnings! %s", warning) + } + joinedWarnings := strings.Join(sqlWarnings, "; ") + return fmt.Errorf("ApplyIterationInsertQuery failed because of SQL warnings: [%s]", joinedWarnings) + } + + atomic.AddInt64(&mgtr.migrationContext.TotalRowsCopied, rowsAffected) + atomic.AddInt64(&mgtr.migrationContext.Iteration, 1) + return nil + } + if err := mgtr.retryBatchCopyWithHooks(applyCopyRowsFunc); err != nil { //nolint:contextcheck + return err + } + } + return nil + }) } - if err := mgtr.retryBatchCopyWithHooks(applyCopyRowsFunc); err != nil { + + if err := g.Wait(); err != nil { return terminateRowIteration(err) } - // record last successfully copied range + // record last successfully copied range (before checking termination flag, + // so the final batch's range is captured for resume) mgtr.applier.LastIterationRangeMutex.Lock() if mgtr.migrationContext.MigrationIterationRangeMinValues != nil && mgtr.migrationContext.MigrationIterationRangeMaxValues != nil { mgtr.applier.LastIterationRangeMinValues = mgtr.migrationContext.MigrationIterationRangeMinValues.Clone() @@ -1636,6 +1693,10 @@ func (mgtr *Migrator) iterateChunks() error { } mgtr.applier.LastIterationRangeMutex.Unlock() + if atomic.LoadInt64(&hasNoFurtherRangeFlag) == 1 { + return terminateRowIteration(nil) + } + return nil } // Enqueue copy operation; to be executed by executeWriteFuncs()