Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ace.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ mtree:
min_block_size: 1000
block_size: 100000
max_block_size: 1000000
# Max differing rows collected per node pair; 0 or absent means unbounded.
max_diff_rows: 1000000

# Example scheduler configuration:
# schedule_jobs:
Expand Down
7 changes: 7 additions & 0 deletions docs/commands/mtree/mtree-table-diff.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,10 @@ holding the node's slot, then re-run, if you need a guaranteed-current drain.
**Notes**

- With `--output html`, both JSON and HTML reports are generated with matching timestamps.
- The number of differing rows collected per node pair is bounded by
`mtree.diff.max_diff_rows` (the shipped `ace.yaml` sets `1000000`; if the key
is absent or `0`, the diff is unbounded). When the cap is reached, enumeration
for that pair stops, the report's `diff_row_limit_reached` flag is set, and a
warning is logged. This keeps a heavily diverged table from exhausting memory;
lower the value in `ace.yaml` on memory-constrained hosts, and re-run after
repairing to surface the remaining differences.
2 changes: 2 additions & 0 deletions internal/cli/default_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ mtree:
min_block_size: 1000
block_size: 100000
max_block_size: 1000000
# Max differing rows collected per node pair; 0 or absent means unbounded.
max_diff_rows: 1000000

# Example scheduler configuration:
# schedule_jobs:
Expand Down
121 changes: 118 additions & 3 deletions internal/consistency/mtree/merkle.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,21 @@ type MerkleTreeTask struct {

mtreeSchema string

DiffResult types.DiffOutput
diffMutex sync.Mutex
diffRowKeySets map[string]map[string]map[string]struct{}
// MaxDiffRows bounds how many differing rows are collected per node pair.
// 0 (the default) is resolved from config.MTree.Diff.MaxDiffRows in
// DiffMtree; a non-positive value after that means unlimited. Without it a
// massively diverged table would accumulate every differing row in memory
// and OOM the process.
MaxDiffRows int64

DiffResult types.DiffOutput
diffMutex sync.Mutex
diffRowKeySets map[string]map[string]map[string]struct{}
// diffRowCounts tracks differing rows collected per node pair for the
// MaxDiffRows cap; diffLimitWarned makes the "limit reached" warning fire
// once. Both are guarded by diffMutex.
diffRowCounts map[string]int64
diffLimitWarned bool
StartTime time.Time
NodeOriginNames map[string]map[string]string

Expand Down Expand Up @@ -437,10 +449,17 @@ func (m *MerkleTreeTask) processWorkItem(work CompareRangesWorkItem, pool1, pool
}

const fetchBatchSize = 2000
// Stable per-pair key: getNodePairs emits each pair once in a fixed order,
// so this is not canonicalised (unlike table_diff.pairKeyFor).
nodePairKey := fmt.Sprintf("%s/%s", work.Node1["Name"], work.Node2["Name"])

if isComposite {
for i := 0; i < len(mismatchedComposite); i += fetchBatchSize {
if m.pairCapReached(nodePairKey) {
// The remaining mismatched keys are real diffs we are skipping.
m.notePairLimitReached(nodePairKey)
break
}
end := i + fetchBatchSize
if end > len(mismatchedComposite) {
end = len(mismatchedComposite)
Expand Down Expand Up @@ -473,6 +492,11 @@ func (m *MerkleTreeTask) processWorkItem(work CompareRangesWorkItem, pool1, pool
}
} else {
for i := 0; i < len(mismatchedSimple); i += fetchBatchSize {
if m.pairCapReached(nodePairKey) {
// The remaining mismatched keys are real diffs we are skipping.
m.notePairLimitReached(nodePairKey)
break
}
end := i + fetchBatchSize
if end > len(mismatchedSimple) {
end = len(mismatchedSimple)
Expand Down Expand Up @@ -541,6 +565,60 @@ func (m *MerkleTreeTask) loadNodeOriginNames() error {
return nil
}

// pairLimitReachedLocked reports whether the node pair has already collected
// MaxDiffRows differing rows. The caller must hold diffMutex.
func (m *MerkleTreeTask) pairLimitReachedLocked(pairKey string) bool {
return m.MaxDiffRows > 0 && m.diffRowCounts[pairKey] >= m.MaxDiffRows
}

// pairCapReached is the lock-safe counterpart of pairLimitReachedLocked, for
// callers (e.g. the fetch loop) that do not already hold diffMutex. It lets a
// worker stop fetching further batches once the pair is full.
func (m *MerkleTreeTask) pairCapReached(pairKey string) bool {
if m.MaxDiffRows <= 0 {
return false
}
m.diffMutex.Lock()
defer m.diffMutex.Unlock()
return m.diffRowCounts[pairKey] >= m.MaxDiffRows
}

// recordPairRowLocked accounts for one collected differing row-pair. The caller
// must hold diffMutex. It does not flag truncation: reaching MaxDiffRows on an
// accepted row does not mean anything was dropped (the pair may have exactly
// MaxDiffRows diffs). Truncation is flagged only when a further row is actually
// refused, via notePairLimitReachedLocked at the stop checks.
func (m *MerkleTreeTask) recordPairRowLocked(pairKey string) {
if m.MaxDiffRows <= 0 {
return
}
if m.diffRowCounts == nil {
m.diffRowCounts = make(map[string]int64)
}
m.diffRowCounts[pairKey]++
}

// notePairLimitReachedLocked marks the report truncated and warns once. Call it
// only when a differing row is actually being dropped because the pair is
// already at MaxDiffRows. The caller must hold diffMutex.
func (m *MerkleTreeTask) notePairLimitReachedLocked(pairKey string) {
m.DiffResult.Summary.DiffRowLimitReached = true
if !m.diffLimitWarned {
m.diffLimitWarned = true
logger.Warn("mtree table-diff: %s reached max_diff_rows limit (%d); enumeration for this pair stops (other pairs continue)", pairKey, m.MaxDiffRows)
}
}

// notePairLimitReached is the lock-safe counterpart of notePairLimitReachedLocked,
// for callers (e.g. the fetch loop) that do not already hold diffMutex.
func (m *MerkleTreeTask) notePairLimitReached(pairKey string) {
m.diffMutex.Lock()
defer m.diffMutex.Unlock()
m.notePairLimitReachedLocked(pairKey)
}

// appendDiffs records the differences between two fetched row batches, stopping
// once the node pair reaches MaxDiffRows.
func (m *MerkleTreeTask) appendDiffs(nodePairKey string, work CompareRangesWorkItem, pr1, pr2 []types.OrderedMap) error {
diffResult, err := utils.CompareRowSets(pr1, pr2, m.Key, m.Cols)
if err != nil {
Expand All @@ -564,26 +642,44 @@ func (m *MerkleTreeTask) appendDiffs(nodePairKey string, work CompareRangesWorkI
m.DiffResult.NodeDiffs[nodePairKey].Rows[node2Name] = []types.OrderedMap{}
}

// Each loop stops (and flags truncation) at its top check the moment it is
// asked to record a row while the pair is already full, so hitting the cap
// on the last accepted row does not spuriously mark the report truncated.
var currentDiffRowsForPair int

for _, row := range diffResult.Node1OnlyRows {
if m.pairLimitReachedLocked(nodePairKey) {
m.notePairLimitReachedLocked(nodePairKey)
break
}
added, err := m.addRowToDiff(nodePairKey, node1Name, row)
if err != nil {
return err
}
if added {
currentDiffRowsForPair++
m.recordPairRowLocked(nodePairKey)
}
}
for _, row := range diffResult.Node2OnlyRows {
if m.pairLimitReachedLocked(nodePairKey) {
m.notePairLimitReachedLocked(nodePairKey)
break
}
added, err := m.addRowToDiff(nodePairKey, node2Name, row)
if err != nil {
return err
}
if added {
currentDiffRowsForPair++
m.recordPairRowLocked(nodePairKey)
}
}
for _, modRow := range diffResult.ModifiedRows {
if m.pairLimitReachedLocked(nodePairKey) {
m.notePairLimitReachedLocked(nodePairKey)
break
}
added1, err := m.addRowToDiff(nodePairKey, node1Name, modRow.Node1Data)
if err != nil {
return err
Expand All @@ -594,6 +690,7 @@ func (m *MerkleTreeTask) appendDiffs(nodePairKey string, work CompareRangesWorkI
}
if added1 || added2 {
currentDiffRowsForPair++
m.recordPairRowLocked(nodePairKey)
}
}

Expand Down Expand Up @@ -2034,6 +2131,12 @@ func (m *MerkleTreeTask) DiffMtree() (err error) {
m.finishLifecycle(recorder, start, err, resultCtx)
}()

// Reject a negative cap up front so a bad value can't silently disable the
// row bound (mirrors table_diff's validation).
if m.MaxDiffRows < 0 {
return fmt.Errorf("max_diff_rows must be >= 0, got %d", m.MaxDiffRows)
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

if err = m.UpdateMtree(true); err != nil {
return fmt.Errorf("failed to update merkle tree before diff: %w", err)
}
Expand All @@ -2054,12 +2157,24 @@ func (m *MerkleTreeTask) DiffMtree() (err error) {
})

m.StartTime = time.Now()
// Resolve the differing-row cap from the mtree config when the caller left
// it unset, mirroring how every other mtree tunable is sourced from the
// mtree section. Without a bound a heavily diverged table collects every
// differing row in memory and OOMs the process.
if m.MaxDiffRows == 0 {
if cfg := config.Get(); cfg != nil && cfg.MTree.Diff.MaxDiffRows > 0 {
m.MaxDiffRows = cfg.MTree.Diff.MaxDiffRows
}
}
m.diffRowCounts = make(map[string]int64)
m.diffLimitWarned = false
m.DiffResult = types.DiffOutput{
NodeDiffs: make(map[string]types.DiffByNodePair),
Summary: types.DiffSummary{
Schema: m.Schema,
Table: m.Table,
Nodes: m.NodeList,
MaxDiffRows: m.MaxDiffRows,
StartTime: time.Now().Format(time.RFC3339),
DiffRowsCount: make(map[string]int),
CDCSkippedNodes: m.CDCSkippedNodes,
Expand Down
102 changes: 102 additions & 0 deletions internal/consistency/mtree/merkle_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,112 @@
package mtree

import (
"context"
"fmt"
"strings"
"testing"

"github.com/pgedge/ace/pkg/types"
)

// mtreeTaskWithDiffState returns a task wired just enough to exercise the diff
// accumulator (appendDiffs) without a database.
func mtreeTaskWithDiffState(maxDiffRows int64) *MerkleTreeTask {
m := &MerkleTreeTask{MaxDiffRows: maxDiffRows}
m.Key = []string{"id"}
m.Cols = []string{"id", "val"}
m.DiffResult = types.DiffOutput{
NodeDiffs: make(map[string]types.DiffByNodePair),
Summary: types.DiffSummary{DiffRowsCount: make(map[string]int)},
}
m.diffRowKeySets = make(map[string]map[string]map[string]struct{})
m.diffRowCounts = make(map[string]int64)
return m
}

// node1OnlyRows builds n rows with unique primary keys, all absent from the peer.
func node1OnlyRows(n int) []types.OrderedMap {
rows := make([]types.OrderedMap, n)
for i := range rows {
rows[i] = types.OrderedMap{{Key: "id", Value: fmt.Sprintf("%d", i)}, {Key: "val", Value: "x"}}
}
return rows
}

// A diverged table stops collecting at max_diff_rows and marks the report truncated.
func TestMtreeDiffEnforcesMaxDiffRows(t *testing.T) {
const cap = 5
m := mtreeTaskWithDiffState(cap)
work := CompareRangesWorkItem{
Node1: map[string]any{"Name": "n1"},
Node2: map[string]any{"Name": "n2"},
}

if err := m.appendDiffs("n1/n2", work, node1OnlyRows(20), nil); err != nil {
t.Fatalf("appendDiffs returned error: %v", err)
}

if got := len(m.DiffResult.NodeDiffs["n1/n2"].Rows["n1"]); got != cap {
t.Errorf("collected %d rows for n1, want the cap of %d", got, cap)
}
if !m.DiffResult.Summary.DiffRowLimitReached {
t.Errorf("expected DiffRowLimitReached=true after exceeding the cap")
}
}

// A pair with exactly max_diff_rows diffs collects them all and is NOT marked
// truncated: reaching the cap on the last row does not mean anything was dropped.
func TestMtreeDiffExactCapNotTruncated(t *testing.T) {
const cap = 5
m := mtreeTaskWithDiffState(cap)
work := CompareRangesWorkItem{
Node1: map[string]any{"Name": "n1"},
Node2: map[string]any{"Name": "n2"},
}

if err := m.appendDiffs("n1/n2", work, node1OnlyRows(cap), nil); err != nil {
t.Fatalf("appendDiffs returned error: %v", err)
}

if got := len(m.DiffResult.NodeDiffs["n1/n2"].Rows["n1"]); got != cap {
t.Errorf("collected %d rows for n1, want all %d", got, cap)
}
if m.DiffResult.Summary.DiffRowLimitReached {
t.Errorf("did not expect DiffRowLimitReached when diffs exactly equal the cap")
}
}

// With no cap configured, every differing row is collected and nothing is flagged truncated.
func TestMtreeDiffNoLimitCollectsAll(t *testing.T) {
m := mtreeTaskWithDiffState(0)
work := CompareRangesWorkItem{
Node1: map[string]any{"Name": "n1"},
Node2: map[string]any{"Name": "n2"},
}

if err := m.appendDiffs("n1/n2", work, node1OnlyRows(20), nil); err != nil {
t.Fatalf("appendDiffs returned error: %v", err)
}

if got := len(m.DiffResult.NodeDiffs["n1/n2"].Rows["n1"]); got != 20 {
t.Errorf("collected %d rows for n1, want all 20", got)
}
if m.DiffResult.Summary.DiffRowLimitReached {
t.Errorf("did not expect DiffRowLimitReached with no cap configured")
}
}

// A negative max_diff_rows is rejected before any work runs.
func TestMtreeDiffRejectsNegativeMaxDiffRows(t *testing.T) {
m := &MerkleTreeTask{MaxDiffRows: -1, SkipDBUpdate: true}
m.Ctx = context.Background()

err := m.DiffMtree()
if err == nil || !strings.Contains(err.Error(), "max_diff_rows must be >= 0") {
t.Fatalf("expected max_diff_rows validation error, got %v", err)
}
}

func TestIsNumericColType(t *testing.T) {
tests := []struct {
colType string
Expand Down
7 changes: 4 additions & 3 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,10 @@ type MTreeConfig struct {
} `yaml:"cdc"`
Schema string `yaml:"schema"`
Diff struct {
MinBlockSize int `yaml:"min_block_size"`
BlockSize int `yaml:"block_size"`
MaxBlockSize int `yaml:"max_block_size"`
MinBlockSize int `yaml:"min_block_size"`
BlockSize int `yaml:"block_size"`
MaxBlockSize int `yaml:"max_block_size"`
MaxDiffRows int64 `yaml:"max_diff_rows"`
} `yaml:"diff"`
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

Expand Down
Loading