diff --git a/ace.yaml b/ace.yaml index 365224a..a4572ac 100644 --- a/ace.yaml +++ b/ace.yaml @@ -43,6 +43,8 @@ mtree: min_block_size: 1000 block_size: 100000 max_block_size: 1000000 + # Max differing rows collected per node pair; 0 or absent means unbounded. + max_diff_rows: 1000000 # Example scheduler configuration: # schedule_jobs: diff --git a/docs/commands/mtree/mtree-table-diff.md b/docs/commands/mtree/mtree-table-diff.md index 5e06170..1cbe7b4 100644 --- a/docs/commands/mtree/mtree-table-diff.md +++ b/docs/commands/mtree/mtree-table-diff.md @@ -45,3 +45,10 @@ holding the node's slot, then re-run, if you need a guaranteed-current drain. **Notes** - With `--output html`, both JSON and HTML reports are generated with matching timestamps. +- The number of differing rows collected per node pair is bounded by + `mtree.diff.max_diff_rows` (the shipped `ace.yaml` sets `1000000`; if the key + is absent or `0`, the diff is unbounded). When the cap is reached, enumeration + for that pair stops, the report's `diff_row_limit_reached` flag is set, and a + warning is logged. This keeps a heavily diverged table from exhausting memory; + lower the value in `ace.yaml` on memory-constrained hosts, and re-run after + repairing to surface the remaining differences. diff --git a/internal/cli/default_config.yaml b/internal/cli/default_config.yaml index 7b67c73..f792a4c 100644 --- a/internal/cli/default_config.yaml +++ b/internal/cli/default_config.yaml @@ -42,6 +42,8 @@ mtree: min_block_size: 1000 block_size: 100000 max_block_size: 1000000 + # Max differing rows collected per node pair; 0 or absent means unbounded. + max_diff_rows: 1000000 # Example scheduler configuration: # schedule_jobs: diff --git a/internal/consistency/mtree/merkle.go b/internal/consistency/mtree/merkle.go index 3a260a1..ee1fc9e 100644 --- a/internal/consistency/mtree/merkle.go +++ b/internal/consistency/mtree/merkle.go @@ -98,9 +98,21 @@ type MerkleTreeTask struct { mtreeSchema string - DiffResult types.DiffOutput - diffMutex sync.Mutex - diffRowKeySets map[string]map[string]map[string]struct{} + // MaxDiffRows bounds how many differing rows are collected per node pair. + // 0 (the default) is resolved from config.MTree.Diff.MaxDiffRows in + // DiffMtree; a non-positive value after that means unlimited. Without it a + // massively diverged table would accumulate every differing row in memory + // and OOM the process. + MaxDiffRows int64 + + DiffResult types.DiffOutput + diffMutex sync.Mutex + diffRowKeySets map[string]map[string]map[string]struct{} + // diffRowCounts tracks differing rows collected per node pair for the + // MaxDiffRows cap; diffLimitWarned makes the "limit reached" warning fire + // once. Both are guarded by diffMutex. + diffRowCounts map[string]int64 + diffLimitWarned bool StartTime time.Time NodeOriginNames map[string]map[string]string @@ -437,10 +449,17 @@ func (m *MerkleTreeTask) processWorkItem(work CompareRangesWorkItem, pool1, pool } const fetchBatchSize = 2000 + // Stable per-pair key: getNodePairs emits each pair once in a fixed order, + // so this is not canonicalised (unlike table_diff.pairKeyFor). nodePairKey := fmt.Sprintf("%s/%s", work.Node1["Name"], work.Node2["Name"]) if isComposite { for i := 0; i < len(mismatchedComposite); i += fetchBatchSize { + if m.pairCapReached(nodePairKey) { + // The remaining mismatched keys are real diffs we are skipping. + m.notePairLimitReached(nodePairKey) + break + } end := i + fetchBatchSize if end > len(mismatchedComposite) { end = len(mismatchedComposite) @@ -473,6 +492,11 @@ func (m *MerkleTreeTask) processWorkItem(work CompareRangesWorkItem, pool1, pool } } else { for i := 0; i < len(mismatchedSimple); i += fetchBatchSize { + if m.pairCapReached(nodePairKey) { + // The remaining mismatched keys are real diffs we are skipping. + m.notePairLimitReached(nodePairKey) + break + } end := i + fetchBatchSize if end > len(mismatchedSimple) { end = len(mismatchedSimple) @@ -541,6 +565,60 @@ func (m *MerkleTreeTask) loadNodeOriginNames() error { return nil } +// pairLimitReachedLocked reports whether the node pair has already collected +// MaxDiffRows differing rows. The caller must hold diffMutex. +func (m *MerkleTreeTask) pairLimitReachedLocked(pairKey string) bool { + return m.MaxDiffRows > 0 && m.diffRowCounts[pairKey] >= m.MaxDiffRows +} + +// pairCapReached is the lock-safe counterpart of pairLimitReachedLocked, for +// callers (e.g. the fetch loop) that do not already hold diffMutex. It lets a +// worker stop fetching further batches once the pair is full. +func (m *MerkleTreeTask) pairCapReached(pairKey string) bool { + if m.MaxDiffRows <= 0 { + return false + } + m.diffMutex.Lock() + defer m.diffMutex.Unlock() + return m.diffRowCounts[pairKey] >= m.MaxDiffRows +} + +// recordPairRowLocked accounts for one collected differing row-pair. The caller +// must hold diffMutex. It does not flag truncation: reaching MaxDiffRows on an +// accepted row does not mean anything was dropped (the pair may have exactly +// MaxDiffRows diffs). Truncation is flagged only when a further row is actually +// refused, via notePairLimitReachedLocked at the stop checks. +func (m *MerkleTreeTask) recordPairRowLocked(pairKey string) { + if m.MaxDiffRows <= 0 { + return + } + if m.diffRowCounts == nil { + m.diffRowCounts = make(map[string]int64) + } + m.diffRowCounts[pairKey]++ +} + +// notePairLimitReachedLocked marks the report truncated and warns once. Call it +// only when a differing row is actually being dropped because the pair is +// already at MaxDiffRows. The caller must hold diffMutex. +func (m *MerkleTreeTask) notePairLimitReachedLocked(pairKey string) { + m.DiffResult.Summary.DiffRowLimitReached = true + if !m.diffLimitWarned { + m.diffLimitWarned = true + logger.Warn("mtree table-diff: %s reached max_diff_rows limit (%d); enumeration for this pair stops (other pairs continue)", pairKey, m.MaxDiffRows) + } +} + +// notePairLimitReached is the lock-safe counterpart of notePairLimitReachedLocked, +// for callers (e.g. the fetch loop) that do not already hold diffMutex. +func (m *MerkleTreeTask) notePairLimitReached(pairKey string) { + m.diffMutex.Lock() + defer m.diffMutex.Unlock() + m.notePairLimitReachedLocked(pairKey) +} + +// appendDiffs records the differences between two fetched row batches, stopping +// once the node pair reaches MaxDiffRows. func (m *MerkleTreeTask) appendDiffs(nodePairKey string, work CompareRangesWorkItem, pr1, pr2 []types.OrderedMap) error { diffResult, err := utils.CompareRowSets(pr1, pr2, m.Key, m.Cols) if err != nil { @@ -564,26 +642,44 @@ func (m *MerkleTreeTask) appendDiffs(nodePairKey string, work CompareRangesWorkI m.DiffResult.NodeDiffs[nodePairKey].Rows[node2Name] = []types.OrderedMap{} } + // Each loop stops (and flags truncation) at its top check the moment it is + // asked to record a row while the pair is already full, so hitting the cap + // on the last accepted row does not spuriously mark the report truncated. var currentDiffRowsForPair int + for _, row := range diffResult.Node1OnlyRows { + if m.pairLimitReachedLocked(nodePairKey) { + m.notePairLimitReachedLocked(nodePairKey) + break + } added, err := m.addRowToDiff(nodePairKey, node1Name, row) if err != nil { return err } if added { currentDiffRowsForPair++ + m.recordPairRowLocked(nodePairKey) } } for _, row := range diffResult.Node2OnlyRows { + if m.pairLimitReachedLocked(nodePairKey) { + m.notePairLimitReachedLocked(nodePairKey) + break + } added, err := m.addRowToDiff(nodePairKey, node2Name, row) if err != nil { return err } if added { currentDiffRowsForPair++ + m.recordPairRowLocked(nodePairKey) } } for _, modRow := range diffResult.ModifiedRows { + if m.pairLimitReachedLocked(nodePairKey) { + m.notePairLimitReachedLocked(nodePairKey) + break + } added1, err := m.addRowToDiff(nodePairKey, node1Name, modRow.Node1Data) if err != nil { return err @@ -594,6 +690,7 @@ func (m *MerkleTreeTask) appendDiffs(nodePairKey string, work CompareRangesWorkI } if added1 || added2 { currentDiffRowsForPair++ + m.recordPairRowLocked(nodePairKey) } } @@ -2034,6 +2131,12 @@ func (m *MerkleTreeTask) DiffMtree() (err error) { m.finishLifecycle(recorder, start, err, resultCtx) }() + // Reject a negative cap up front so a bad value can't silently disable the + // row bound (mirrors table_diff's validation). + if m.MaxDiffRows < 0 { + return fmt.Errorf("max_diff_rows must be >= 0, got %d", m.MaxDiffRows) + } + if err = m.UpdateMtree(true); err != nil { return fmt.Errorf("failed to update merkle tree before diff: %w", err) } @@ -2054,12 +2157,24 @@ func (m *MerkleTreeTask) DiffMtree() (err error) { }) m.StartTime = time.Now() + // Resolve the differing-row cap from the mtree config when the caller left + // it unset, mirroring how every other mtree tunable is sourced from the + // mtree section. Without a bound a heavily diverged table collects every + // differing row in memory and OOMs the process. + if m.MaxDiffRows == 0 { + if cfg := config.Get(); cfg != nil && cfg.MTree.Diff.MaxDiffRows > 0 { + m.MaxDiffRows = cfg.MTree.Diff.MaxDiffRows + } + } + m.diffRowCounts = make(map[string]int64) + m.diffLimitWarned = false m.DiffResult = types.DiffOutput{ NodeDiffs: make(map[string]types.DiffByNodePair), Summary: types.DiffSummary{ Schema: m.Schema, Table: m.Table, Nodes: m.NodeList, + MaxDiffRows: m.MaxDiffRows, StartTime: time.Now().Format(time.RFC3339), DiffRowsCount: make(map[string]int), CDCSkippedNodes: m.CDCSkippedNodes, diff --git a/internal/consistency/mtree/merkle_test.go b/internal/consistency/mtree/merkle_test.go index ed0310b..deace82 100644 --- a/internal/consistency/mtree/merkle_test.go +++ b/internal/consistency/mtree/merkle_test.go @@ -1,10 +1,112 @@ package mtree import ( + "context" + "fmt" "strings" "testing" + + "github.com/pgedge/ace/pkg/types" ) +// mtreeTaskWithDiffState returns a task wired just enough to exercise the diff +// accumulator (appendDiffs) without a database. +func mtreeTaskWithDiffState(maxDiffRows int64) *MerkleTreeTask { + m := &MerkleTreeTask{MaxDiffRows: maxDiffRows} + m.Key = []string{"id"} + m.Cols = []string{"id", "val"} + m.DiffResult = types.DiffOutput{ + NodeDiffs: make(map[string]types.DiffByNodePair), + Summary: types.DiffSummary{DiffRowsCount: make(map[string]int)}, + } + m.diffRowKeySets = make(map[string]map[string]map[string]struct{}) + m.diffRowCounts = make(map[string]int64) + return m +} + +// node1OnlyRows builds n rows with unique primary keys, all absent from the peer. +func node1OnlyRows(n int) []types.OrderedMap { + rows := make([]types.OrderedMap, n) + for i := range rows { + rows[i] = types.OrderedMap{{Key: "id", Value: fmt.Sprintf("%d", i)}, {Key: "val", Value: "x"}} + } + return rows +} + +// A diverged table stops collecting at max_diff_rows and marks the report truncated. +func TestMtreeDiffEnforcesMaxDiffRows(t *testing.T) { + const cap = 5 + m := mtreeTaskWithDiffState(cap) + work := CompareRangesWorkItem{ + Node1: map[string]any{"Name": "n1"}, + Node2: map[string]any{"Name": "n2"}, + } + + if err := m.appendDiffs("n1/n2", work, node1OnlyRows(20), nil); err != nil { + t.Fatalf("appendDiffs returned error: %v", err) + } + + if got := len(m.DiffResult.NodeDiffs["n1/n2"].Rows["n1"]); got != cap { + t.Errorf("collected %d rows for n1, want the cap of %d", got, cap) + } + if !m.DiffResult.Summary.DiffRowLimitReached { + t.Errorf("expected DiffRowLimitReached=true after exceeding the cap") + } +} + +// A pair with exactly max_diff_rows diffs collects them all and is NOT marked +// truncated: reaching the cap on the last row does not mean anything was dropped. +func TestMtreeDiffExactCapNotTruncated(t *testing.T) { + const cap = 5 + m := mtreeTaskWithDiffState(cap) + work := CompareRangesWorkItem{ + Node1: map[string]any{"Name": "n1"}, + Node2: map[string]any{"Name": "n2"}, + } + + if err := m.appendDiffs("n1/n2", work, node1OnlyRows(cap), nil); err != nil { + t.Fatalf("appendDiffs returned error: %v", err) + } + + if got := len(m.DiffResult.NodeDiffs["n1/n2"].Rows["n1"]); got != cap { + t.Errorf("collected %d rows for n1, want all %d", got, cap) + } + if m.DiffResult.Summary.DiffRowLimitReached { + t.Errorf("did not expect DiffRowLimitReached when diffs exactly equal the cap") + } +} + +// With no cap configured, every differing row is collected and nothing is flagged truncated. +func TestMtreeDiffNoLimitCollectsAll(t *testing.T) { + m := mtreeTaskWithDiffState(0) + work := CompareRangesWorkItem{ + Node1: map[string]any{"Name": "n1"}, + Node2: map[string]any{"Name": "n2"}, + } + + if err := m.appendDiffs("n1/n2", work, node1OnlyRows(20), nil); err != nil { + t.Fatalf("appendDiffs returned error: %v", err) + } + + if got := len(m.DiffResult.NodeDiffs["n1/n2"].Rows["n1"]); got != 20 { + t.Errorf("collected %d rows for n1, want all 20", got) + } + if m.DiffResult.Summary.DiffRowLimitReached { + t.Errorf("did not expect DiffRowLimitReached with no cap configured") + } +} + +// A negative max_diff_rows is rejected before any work runs. +func TestMtreeDiffRejectsNegativeMaxDiffRows(t *testing.T) { + m := &MerkleTreeTask{MaxDiffRows: -1, SkipDBUpdate: true} + m.Ctx = context.Background() + + err := m.DiffMtree() + if err == nil || !strings.Contains(err.Error(), "max_diff_rows must be >= 0") { + t.Fatalf("expected max_diff_rows validation error, got %v", err) + } +} + func TestIsNumericColType(t *testing.T) { tests := []struct { colType string diff --git a/pkg/config/config.go b/pkg/config/config.go index 74be4f7..6911d4b 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -67,9 +67,10 @@ type MTreeConfig struct { } `yaml:"cdc"` Schema string `yaml:"schema"` Diff struct { - MinBlockSize int `yaml:"min_block_size"` - BlockSize int `yaml:"block_size"` - MaxBlockSize int `yaml:"max_block_size"` + MinBlockSize int `yaml:"min_block_size"` + BlockSize int `yaml:"block_size"` + MaxBlockSize int `yaml:"max_block_size"` + MaxDiffRows int64 `yaml:"max_diff_rows"` } `yaml:"diff"` }