From e7e34e2270c0399ade0139ff9c35de202a46bf2b Mon Sep 17 00:00:00 2001 From: Gabriel Gilder Date: Wed, 1 Apr 2026 20:47:27 -0700 Subject: [PATCH 1/4] Add failing test for retry + abort issue --- go/logic/migrator_test.go | 99 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 99 insertions(+) diff --git a/go/logic/migrator_test.go b/go/logic/migrator_test.go index 7b02c6b3f..39c769bab 100644 --- a/go/logic/migrator_test.go +++ b/go/logic/migrator_test.go @@ -1210,3 +1210,102 @@ func TestCheckAbort_DetectsContextCancellation(t *testing.T) { t.Fatal("Expected checkAbort to return error when context is cancelled") } } + +func (suite *MigratorTestSuite) TestPanicOnWarningsDuplicateDuringCutoverWithHighRetries() { + ctx := context.Background() + + // Create table with email column (no unique constraint initially) + _, err := suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY AUTO_INCREMENT, email VARCHAR(100))", getTestTableName())) + suite.Require().NoError(err) + + // Insert initial rows with unique email values - passes pre-flight validation + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (email) VALUES ('user1@example.com')", getTestTableName())) + suite.Require().NoError(err) + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (email) VALUES ('user2@example.com')", getTestTableName())) + suite.Require().NoError(err) + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (email) VALUES ('user3@example.com')", getTestTableName())) + suite.Require().NoError(err) + + // Verify we have 3 rows + var count int + err = suite.db.QueryRowContext(ctx, fmt.Sprintf("SELECT COUNT(*) FROM %s", getTestTableName())).Scan(&count) + suite.Require().NoError(err) + suite.Require().Equal(3, count) + + // Create postpone flag file + tmpDir, err := os.MkdirTemp("", "gh-ost-postpone-test") + suite.Require().NoError(err) + defer os.RemoveAll(tmpDir) + postponeFlagFile := filepath.Join(tmpDir, "postpone.flag") + err = os.WriteFile(postponeFlagFile, []byte{}, 0644) + suite.Require().NoError(err) + + // Start migration in goroutine + done := make(chan error, 1) + go func() { + connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer) + if err != nil { + done <- err + return + } + + migrationContext := newTestMigrationContext() + migrationContext.ApplierConnectionConfig = connectionConfig + migrationContext.InspectorConnectionConfig = connectionConfig + migrationContext.SetConnectionConfig("innodb") + migrationContext.AlterStatementOptions = "ADD UNIQUE KEY unique_email_idx (email)" + migrationContext.HeartbeatIntervalMilliseconds = 100 + migrationContext.PostponeCutOverFlagFile = postponeFlagFile + migrationContext.PanicOnWarnings = true + + // High retry count + exponential backoff means retries will take a long time and fail the test if not properly aborted + migrationContext.SetDefaultNumRetries(30) + migrationContext.CutOverExponentialBackoff = true + migrationContext.SetExponentialBackoffMaxInterval(128) + + migrator := NewMigrator(migrationContext, "0.0.0") + + //nolint:contextcheck + done <- migrator.Migrate() + }() + + // Wait for migration to reach postponed state + // TODO replace this with an actual check for postponed state + time.Sleep(3 * time.Second) + + // Now insert a duplicate email value while migration is postponed + // This simulates data arriving during migration that would violate the unique constraint + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (email) VALUES ('user1@example.com')", getTestTableName())) + suite.Require().NoError(err) + + // Verify we now have 4 rows (including the duplicate) + err = suite.db.QueryRowContext(ctx, fmt.Sprintf("SELECT COUNT(*) FROM %s", getTestTableName())).Scan(&count) + suite.Require().NoError(err) + suite.Require().Equal(4, count) + + // Unpostpone the migration - gh-ost will now try to apply binlog events with the duplicate + err = os.Remove(postponeFlagFile) + suite.Require().NoError(err) + + // Wait for Migrate() to return - with timeout to detect if it hangs + select { + case migrateErr := <-done: + // Success - Migrate() returned + // It should return an error due to the duplicate + suite.Require().Error(migrateErr, "Expected migration to fail due to duplicate key violation") + suite.Require().Contains(migrateErr.Error(), "Duplicate entry", "Error should mention duplicate entry") + case <-time.After(5 * time.Minute): + suite.FailNow("Migrate() hung and did not return within 5 minutes - failure to abort on warnings in retry loop") + } + + // Verify all 4 rows are still in the original table (no silent data loss) + err = suite.db.QueryRowContext(ctx, fmt.Sprintf("SELECT COUNT(*) FROM %s", getTestTableName())).Scan(&count) + suite.Require().NoError(err) + suite.Require().Equal(4, count, "Original table should still have all 4 rows") + + // Verify both user1@example.com entries still exist + var duplicateCount int + err = suite.db.QueryRowContext(ctx, fmt.Sprintf("SELECT COUNT(*) FROM %s WHERE email = 'user1@example.com'", getTestTableName())).Scan(&duplicateCount) + suite.Require().NoError(err) + suite.Require().Equal(2, duplicateCount, "Should have 2 duplicate email entries") +} From cb9054a355f638f28c365bbe68d4261129885672 Mon Sep 17 00:00:00 2001 From: Gabriel Gilder Date: Thu, 2 Apr 2026 09:39:50 -0700 Subject: [PATCH 2/4] Fix retry after abort issue --- go/logic/migrator.go | 8 +++++ go/logic/migrator_test.go | 61 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 69 insertions(+) diff --git a/go/logic/migrator.go b/go/logic/migrator.go index e3e6d429d..98d2feecf 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -160,6 +160,10 @@ func (this *Migrator) retryOperation(operation func() error, notFatalHint ...boo // sleep after previous iteration RetrySleepFn(1 * time.Second) } + // Check for abort/context cancellation before each retry + if abortErr := this.checkAbort(); abortErr != nil { + return abortErr + } err = operation() if err == nil { return nil @@ -190,6 +194,10 @@ func (this *Migrator) retryOperationWithExponentialBackoff(operation func() erro if i != 0 { RetrySleepFn(time.Duration(interval) * time.Second) } + // Check for abort/context cancellation before each retry + if abortErr := this.checkAbort(); abortErr != nil { + return abortErr + } err = operation() if err == nil { return nil diff --git a/go/logic/migrator_test.go b/go/logic/migrator_test.go index 39c769bab..1f94939eb 100644 --- a/go/logic/migrator_test.go +++ b/go/logic/migrator_test.go @@ -714,6 +714,67 @@ func TestMigratorRetryWithExponentialBackoff(t *testing.T) { assert.Equal(t, tries, 100) } +func TestMigratorRetryAbortsOnContextCancellation(t *testing.T) { + oldRetrySleepFn := RetrySleepFn + defer func() { RetrySleepFn = oldRetrySleepFn }() + + migrationContext := base.NewMigrationContext() + migrationContext.SetDefaultNumRetries(100) + migrator := NewMigrator(migrationContext, "1.2.3") + + RetrySleepFn = func(duration time.Duration) { + // No sleep needed for this test + } + + var tries = 0 + retryable := func() error { + tries++ + if tries == 5 { + // Cancel context on 5th try + migrationContext.CancelContext() + } + return errors.New("Simulated error") + } + + result := migrator.retryOperation(retryable, false) + assert.Error(t, result) + // Should abort after 6 tries: 5 failures + 1 checkAbort detection + assert.True(t, tries <= 6, "Expected tries <= 6, got %d", tries) + // Verify we got context cancellation error + assert.Contains(t, result.Error(), "context canceled") +} + +func TestMigratorRetryWithExponentialBackoffAbortsOnContextCancellation(t *testing.T) { + oldRetrySleepFn := RetrySleepFn + defer func() { RetrySleepFn = oldRetrySleepFn }() + + migrationContext := base.NewMigrationContext() + migrationContext.SetDefaultNumRetries(100) + migrationContext.SetExponentialBackoffMaxInterval(42) + migrator := NewMigrator(migrationContext, "1.2.3") + + RetrySleepFn = func(duration time.Duration) { + // No sleep needed for this test + } + + var tries = 0 + retryable := func() error { + tries++ + if tries == 5 { + // Cancel context on 5th try + migrationContext.CancelContext() + } + return errors.New("Simulated error") + } + + result := migrator.retryOperationWithExponentialBackoff(retryable, false) + assert.Error(t, result) + // Should abort after 6 tries: 5 failures + 1 checkAbort detection + assert.True(t, tries <= 6, "Expected tries <= 6, got %d", tries) + // Verify we got context cancellation error + assert.Contains(t, result.Error(), "context canceled") +} + func (suite *MigratorTestSuite) TestCutOverLossDataCaseLockGhostBeforeRename() { ctx := context.Background() From fe40ebd9b7df9db8224ced7082c12e3d4031f89c Mon Sep 17 00:00:00 2001 From: Gabriel Gilder Date: Thu, 2 Apr 2026 10:25:54 -0700 Subject: [PATCH 3/4] Skip retries for warning errors Warning errors indicate data consistency issues that won't resolve on retry, so attempting to retry them is futile and causes unnecessary delays. This change detects warning errors early and aborts immediately instead of retrying. --- go/logic/migrator.go | 14 +++++++++++ go/logic/migrator_test.go | 51 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 65 insertions(+) diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 98d2feecf..f32d859b8 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -168,6 +168,13 @@ func (this *Migrator) retryOperation(operation func() error, notFatalHint ...boo if err == nil { return nil } + // Check if this is an unrecoverable error (data consistency issues won't resolve on retry) + if strings.Contains(err.Error(), "warnings detected") { + if len(notFatalHint) == 0 { + _ = base.SendWithContext(this.migrationContext.GetContext(), this.migrationContext.PanicAbort, err) + } + return err + } // there's an error. Let's try again. } if len(notFatalHint) == 0 { @@ -202,6 +209,13 @@ func (this *Migrator) retryOperationWithExponentialBackoff(operation func() erro if err == nil { return nil } + // Check if this is an unrecoverable error (data consistency issues won't resolve on retry) + if strings.Contains(err.Error(), "warnings detected") { + if len(notFatalHint) == 0 { + _ = base.SendWithContext(this.migrationContext.GetContext(), this.migrationContext.PanicAbort, err) + } + return err + } } if len(notFatalHint) == 0 { // Use helper to prevent deadlock if listenOnPanicAbort already exited diff --git a/go/logic/migrator_test.go b/go/logic/migrator_test.go index 1f94939eb..f731035e1 100644 --- a/go/logic/migrator_test.go +++ b/go/logic/migrator_test.go @@ -775,6 +775,57 @@ func TestMigratorRetryWithExponentialBackoffAbortsOnContextCancellation(t *testi assert.Contains(t, result.Error(), "context canceled") } +func TestMigratorRetrySkipsRetriesForWarnings(t *testing.T) { + oldRetrySleepFn := RetrySleepFn + defer func() { RetrySleepFn = oldRetrySleepFn }() + + migrationContext := base.NewMigrationContext() + migrationContext.SetDefaultNumRetries(100) + migrator := NewMigrator(migrationContext, "1.2.3") + + RetrySleepFn = func(duration time.Duration) { + t.Fatal("Should not sleep/retry for warning errors") + } + + var tries = 0 + retryable := func() error { + tries++ + return errors.New("warnings detected in statement 1 of 1: [Warning: Duplicate entry 'test' for key 'idx' (1062)]") + } + + result := migrator.retryOperation(retryable, false) + assert.Error(t, result) + // Should only try once - no retries for warnings + assert.Equal(t, 1, tries, "Expected exactly 1 try (no retries) for warning error") + assert.Contains(t, result.Error(), "warnings detected") +} + +func TestMigratorRetryWithExponentialBackoffSkipsRetriesForWarnings(t *testing.T) { + oldRetrySleepFn := RetrySleepFn + defer func() { RetrySleepFn = oldRetrySleepFn }() + + migrationContext := base.NewMigrationContext() + migrationContext.SetDefaultNumRetries(100) + migrationContext.SetExponentialBackoffMaxInterval(42) + migrator := NewMigrator(migrationContext, "1.2.3") + + RetrySleepFn = func(duration time.Duration) { + t.Fatal("Should not sleep/retry for warning errors") + } + + var tries = 0 + retryable := func() error { + tries++ + return errors.New("warnings detected in statement 1 of 1: [Warning: Duplicate entry 'test' for key 'idx' (1062)]") + } + + result := migrator.retryOperationWithExponentialBackoff(retryable, false) + assert.Error(t, result) + // Should only try once - no retries for warnings + assert.Equal(t, 1, tries, "Expected exactly 1 try (no retries) for warning error") + assert.Contains(t, result.Error(), "warnings detected") +} + func (suite *MigratorTestSuite) TestCutOverLossDataCaseLockGhostBeforeRename() { ctx := context.Background() From 3c173ae74e09e8810aff1bf9d8a8b93b3fba9928 Mon Sep 17 00:00:00 2001 From: Gabriel Gilder Date: Thu, 2 Apr 2026 10:49:15 -0700 Subject: [PATCH 4/4] Fix test expectation --- .../expect_failure | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/localtests/panic-on-warnings-update-pk-with-duplicate-on-new-unique-index/expect_failure b/localtests/panic-on-warnings-update-pk-with-duplicate-on-new-unique-index/expect_failure index fb8dc562a..5a6e5411e 100644 --- a/localtests/panic-on-warnings-update-pk-with-duplicate-on-new-unique-index/expect_failure +++ b/localtests/panic-on-warnings-update-pk-with-duplicate-on-new-unique-index/expect_failure @@ -1 +1 @@ -ERROR warnings detected in statement 1 of 1 +ERROR warnings detected in statement