Skip to content

Commit d131335

Browse files
author
Lincoln Lo
committed
Add RowsVerified to InlineVerifier and EventsVerified to TargetVerifier
1 parent 7b03a45 commit d131335

7 files changed

Lines changed: 47 additions & 11 deletions

File tree

copydb/copydb.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,10 @@ func (this *CopydbFerry) Run() {
109109
// should be identical.
110110
copyWG.Wait()
111111

112-
this.Ferry.StopTargetVerifier()
112+
err := this.Ferry.StopTargetVerifier()
113+
if err != nil {
114+
this.Ferry.ErrorHandler.Fatal("target_verifier", err)
115+
}
113116

114117
// This is where you cutover from using the source database to
115118
// using the target database.

ferry.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -844,11 +844,17 @@ func (f *Ferry) FlushBinlogAndStopStreaming() {
844844
f.BinlogStreamer.FlushAndStop()
845845
}
846846

847-
func (f *Ferry) StopTargetVerifier() {
847+
func (f *Ferry) StopTargetVerifier() error {
848848
if !f.Config.SkipTargetVerification {
849849
f.TargetVerifier.BinlogStreamer.FlushAndStop()
850850
f.targetVerifierWg.Wait()
851+
if f.TargetVerifier.EventsVerified == 0 {
852+
err := fmt.Errorf("no events verified")
853+
f.logger.WithField("error", err).Errorf("target verifier did not verify any events")
854+
return err
855+
}
851856
}
857+
return nil
852858
}
853859

854860
func (f *Ferry) SerializeStateToJSON() (string, error) {

inline_verifier.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,7 @@ type InlineVerifier struct {
248248
reverifyStore *BinlogVerifyStore
249249
verifyDuringCutoverStarted AtomicBoolean
250250

251+
verifyCounter int
251252
sourceStmtCache *StmtCache
252253
targetStmtCache *StmtCache
253254
logger *logrus.Entry
@@ -435,6 +436,13 @@ func (v *InlineVerifier) VerifyDuringCutover() (VerificationResult, error) {
435436
return VerificationResult{}, err
436437
}
437438

439+
if v.verifyCounter == 0 {
440+
return VerificationResult{
441+
DataCorrect: false,
442+
Message: "cutover verification failed, verifier did not compare any events",
443+
}, nil
444+
}
445+
438446
if !mismatchFound {
439447
return VerificationResult{
440448
DataCorrect: true,
@@ -563,13 +571,15 @@ func (v *InlineVerifier) compareHashes(source, target map[uint64][]byte) map[uin
563571
if !bytes.Equal(sourceHash, targetHash) || !exists {
564572
mismatchSet[paginationKey] = struct{}{}
565573
}
574+
v.verifyCounter += 1
566575
}
567576

568577
for paginationKey, sourceHash := range source {
569578
targetHash, exists := target[paginationKey]
570579
if !bytes.Equal(sourceHash, targetHash) || !exists {
571580
mismatchSet[paginationKey] = struct{}{}
572581
}
582+
v.verifyCounter += 1
573583
}
574584

575585
return mismatchSet
@@ -580,6 +590,7 @@ func (v *InlineVerifier) compareDecompressedData(source, target map[uint64]map[s
580590

581591
for paginationKey, targetDecompressedColumns := range target {
582592
sourceDecompressedColumns, exists := source[paginationKey]
593+
v.verifyCounter += 1
583594
if !exists {
584595
mismatchSet[paginationKey] = struct{}{}
585596
continue
@@ -596,6 +607,7 @@ func (v *InlineVerifier) compareDecompressedData(source, target map[uint64]map[s
596607

597608
for paginationKey, sourceDecompressedColumns := range source {
598609
targetDecompressedColumns, exists := target[paginationKey]
610+
v.verifyCounter += 1
599611
if !exists {
600612
mismatchSet[paginationKey] = struct{}{}
601613
continue

sharding/sharding.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -157,11 +157,10 @@ func (r *ShardingFerry) Run() {
157157
r.logger.WithField("error", err).Errorf("verification encountered an error, aborting run")
158158
r.Ferry.ErrorHandler.Fatal("inline_verifier", err)
159159
} else if !verificationResult.DataCorrect {
160-
err = fmt.Errorf("verifier detected data discrepancy: %s", verificationResult.Message)
160+
err = fmt.Errorf("verifier detected error: %s", verificationResult.Message)
161161
r.logger.WithField("error", err).Errorf("verification failed, aborting run")
162162
r.Ferry.ErrorHandler.Fatal("inline_verifier", err)
163163
}
164-
165164
metrics.Measure("CopyPrimaryKeyTables", nil, 1.0, func() {
166165
err = r.copyPrimaryKeyTables()
167166
})
@@ -172,7 +171,10 @@ func (r *ShardingFerry) Run() {
172171

173172
r.Ferry.Throttler.SetDisabled(false)
174173

175-
r.Ferry.StopTargetVerifier()
174+
err = r.Ferry.StopTargetVerifier()
175+
if err != nil {
176+
r.Ferry.ErrorHandler.Fatal("target_verifier", err)
177+
}
176178

177179
metrics.Measure("CutoverUnlock", nil, 1.0, func() {
178180
err = r.config.CutoverUnlock.Post(&client)

target_verifier.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ type TargetVerifier struct {
1212
DB *sql.DB
1313
BinlogStreamer *BinlogStreamer
1414
StateTracker *StateTracker
15+
EventsVerified int
1516
}
1617

1718
func NewTargetVerifier(targetDB *sql.DB, stateTracker *StateTracker, binlogStreamer *BinlogStreamer) (*TargetVerifier, error) {
@@ -36,7 +37,8 @@ func (t *TargetVerifier) BinlogEventListener(evs []DMLEvent) error {
3637
return err
3738
}
3839

39-
// Ghostferry's annotation will alwaays be the first, if available
40+
t.EventsVerified += 1
41+
// Ghostferry's annotation will always be the first, if available
4042
if annotation == "" || annotation != t.DB.Marginalia {
4143
paginationKey, err := ev.PaginationKey()
4244
if err != nil {

test/integration/inline_verifier_test.rb

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ def test_different_compressed_data_is_detected_inline_with_batch_writer
6868

6969
assert verification_ran
7070
assert_equal ["#{DEFAULT_DB}.#{DEFAULT_TABLE}"], incorrect_tables
71-
assert_equal "cutover verification failed for: gftest.test_table_1 [paginationKeys: 1 ] ", ghostferry.error_lines.last["msg"]
71+
assert_equal "cutover verification failed for: gftest.test_table_1 [paginationKeys: 1 ] ", ghostferry.error_lines.first["msg"]
7272
end
7373

7474
def test_same_decompressed_data_different_compressed_test_passes_inline_verification
@@ -430,7 +430,7 @@ def test_positive_negative_zero
430430

431431
assert verification_ran
432432
assert_equal ["#{DEFAULT_DB}.#{DEFAULT_TABLE}"], incorrect_tables
433-
assert_equal "cutover verification failed for: #{DEFAULT_DB}.#{DEFAULT_TABLE} [paginationKeys: 1 ] ", ghostferry.error_lines.last["msg"]
433+
assert_equal "cutover verification failed for: #{DEFAULT_DB}.#{DEFAULT_TABLE} [paginationKeys: 1 ] ", ghostferry.error_lines.first["msg"]
434434

435435
# Now we run the real test case.
436436
target_db.query("UPDATE #{DEFAULT_FULL_TABLE_NAME} SET data = -0.0 WHERE id = 1")
@@ -484,7 +484,7 @@ def test_null_vs_empty_string
484484

485485
assert verification_ran
486486
assert_equal ["#{DEFAULT_DB}.#{DEFAULT_TABLE}"], incorrect_tables
487-
assert_equal "cutover verification failed for: gftest.test_table_1 [paginationKeys: 1 ] ", ghostferry.error_lines.last["msg"]
487+
assert_equal "cutover verification failed for: gftest.test_table_1 [paginationKeys: 1 ] ", ghostferry.error_lines.first["msg"]
488488
end
489489

490490
def test_null_vs_null_string
@@ -507,7 +507,7 @@ def test_null_vs_null_string
507507

508508
assert verification_ran
509509
assert_equal ["#{DEFAULT_DB}.#{DEFAULT_TABLE}"], incorrect_tables
510-
assert_equal "cutover verification failed for: gftest.test_table_1 [paginationKeys: 1 ] ", ghostferry.error_lines.last["msg"]
510+
assert_equal "cutover verification failed for: gftest.test_table_1 [paginationKeys: 1 ] ", ghostferry.error_lines.first["msg"]
511511
end
512512

513513
def test_null_in_different_order
@@ -533,7 +533,7 @@ def test_null_in_different_order
533533

534534
assert verification_ran
535535
assert_equal ["#{DEFAULT_DB}.#{DEFAULT_TABLE}"], incorrect_tables
536-
assert_equal "cutover verification failed for: gftest.test_table_1 [paginationKeys: 1 ] ", ghostferry.error_lines.last["msg"]
536+
assert_equal "cutover verification failed for: gftest.test_table_1 [paginationKeys: 1 ] ", ghostferry.error_lines.first["msg"]
537537
end
538538

539539
###################

test/integration/trivial_test.rb

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,4 +39,15 @@ def test_logged_query_omits_columns
3939
end
4040
end
4141
end
42+
43+
def test_no_events_verified_will_log_error
44+
seed_random_data(source_db, number_of_rows: 0)
45+
seed_random_data(target_db, number_of_rows: 0)
46+
47+
ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY)
48+
49+
ghostferry.run
50+
51+
assert_equal "no events verified", ghostferry.error_lines.last["error"]
52+
end
4253
end

0 commit comments

Comments
 (0)