From 394737e8d5150635d2aae26f8b52b860c7acbb14 Mon Sep 17 00:00:00 2001 From: Andrei Lepikhov Date: Wed, 1 Jul 2026 15:28:07 +0200 Subject: [PATCH 1/2] fix(mtree): bound table-diff row collection with max_diff_rows (ACE-198) The Merkle-tree diff accumulated every differing row per node pair in memory with no limit, so a heavily diverged table (~1.5M differing rows) grew the in-memory diff to several GB and the process was OOM-killed. The classic table-diff engine already caps this via max_diff_rows; the mtree engine did not. Mirror that cap in the mtree path: DiffMtree resolves MaxDiffRows from table_diff.max_diff_rows (default 1000000; 0 = unbounded), appendDiffs stops collecting once a pair reaches the cap and marks the report with diff_row_limit_reached (warning once), and the fetch loops stop pulling further batches for a capped pair. A negative cap is rejected up front. This bounds memory and honestly flags a truncated diff instead of dying silently; the default still assumes a few GB of headroom, so lower it on memory-constrained hosts and re-run after repair. Co-Authored-By: Claude Opus 4.8 (1M context) --- docs/commands/mtree/mtree-table-diff.md | 7 + internal/consistency/mtree/merkle.go | 150 +++++++++++++++++++--- internal/consistency/mtree/merkle_test.go | 80 ++++++++++++ 3 files changed, 216 insertions(+), 21 deletions(-) diff --git a/docs/commands/mtree/mtree-table-diff.md b/docs/commands/mtree/mtree-table-diff.md index 5e06170..4f85cf0 100644 --- a/docs/commands/mtree/mtree-table-diff.md +++ b/docs/commands/mtree/mtree-table-diff.md @@ -45,3 +45,10 @@ holding the node's slot, then re-run, if you need a guaranteed-current drain. **Notes** - With `--output html`, both JSON and HTML reports are generated with matching timestamps. +- The number of differing rows collected per node pair is bounded by + `table_diff.max_diff_rows` (the shipped `ace.yaml` sets `1000000`; if the key + is absent or `0`, the diff is unbounded). When the cap is reached, enumeration + for that pair stops, the report's `diff_row_limit_reached` flag is set, and a + warning is logged. This keeps a heavily diverged table from exhausting memory; + lower the value in `ace.yaml` on memory-constrained hosts, and re-run after + repairing to surface the remaining differences. diff --git a/internal/consistency/mtree/merkle.go b/internal/consistency/mtree/merkle.go index 3a260a1..b5104b9 100644 --- a/internal/consistency/mtree/merkle.go +++ b/internal/consistency/mtree/merkle.go @@ -98,9 +98,21 @@ type MerkleTreeTask struct { mtreeSchema string - DiffResult types.DiffOutput - diffMutex sync.Mutex - diffRowKeySets map[string]map[string]map[string]struct{} + // MaxDiffRows bounds how many differing rows are collected per node pair. + // 0 (the default) is resolved from config.TableDiff.MaxDiffRows in + // DiffMtree; a non-positive value after that means unlimited. Without it a + // massively diverged table would accumulate every differing row in memory + // and OOM the process. + MaxDiffRows int64 + + DiffResult types.DiffOutput + diffMutex sync.Mutex + diffRowKeySets map[string]map[string]map[string]struct{} + // diffRowCounts tracks differing rows collected per node pair for the + // MaxDiffRows cap; diffLimitWarned makes the "limit reached" warning fire + // once. Both are guarded by diffMutex. + diffRowCounts map[string]int64 + diffLimitWarned bool StartTime time.Time NodeOriginNames map[string]map[string]string @@ -437,10 +449,15 @@ func (m *MerkleTreeTask) processWorkItem(work CompareRangesWorkItem, pool1, pool } const fetchBatchSize = 2000 + // Stable per-pair key: getNodePairs emits each pair once in a fixed order, + // so this is not canonicalised (unlike table_diff.pairKeyFor). nodePairKey := fmt.Sprintf("%s/%s", work.Node1["Name"], work.Node2["Name"]) if isComposite { for i := 0; i < len(mismatchedComposite); i += fetchBatchSize { + if m.pairCapReached(nodePairKey) { + break + } end := i + fetchBatchSize if end > len(mismatchedComposite) { end = len(mismatchedComposite) @@ -473,6 +490,9 @@ func (m *MerkleTreeTask) processWorkItem(work CompareRangesWorkItem, pool1, pool } } else { for i := 0; i < len(mismatchedSimple); i += fetchBatchSize { + if m.pairCapReached(nodePairKey) { + break + } end := i + fetchBatchSize if end > len(mismatchedSimple) { end = len(mismatchedSimple) @@ -541,6 +561,48 @@ func (m *MerkleTreeTask) loadNodeOriginNames() error { return nil } +// pairLimitReachedLocked reports whether the node pair has already collected +// MaxDiffRows differing rows. The caller must hold diffMutex. +func (m *MerkleTreeTask) pairLimitReachedLocked(pairKey string) bool { + return m.MaxDiffRows > 0 && m.diffRowCounts[pairKey] >= m.MaxDiffRows +} + +// pairCapReached is the lock-safe counterpart of pairLimitReachedLocked, for +// callers (e.g. the fetch loop) that do not already hold diffMutex. It lets a +// worker stop fetching further batches once the pair is full. +func (m *MerkleTreeTask) pairCapReached(pairKey string) bool { + if m.MaxDiffRows <= 0 { + return false + } + m.diffMutex.Lock() + defer m.diffMutex.Unlock() + return m.diffRowCounts[pairKey] >= m.MaxDiffRows +} + +// recordPairRowLocked accounts for one collected differing row-pair and reports +// whether the pair has now reached MaxDiffRows. On first reaching the cap it +// flags the report as truncated and warns once. The caller must hold diffMutex. +func (m *MerkleTreeTask) recordPairRowLocked(pairKey string) bool { + if m.MaxDiffRows <= 0 { + return false + } + if m.diffRowCounts == nil { + m.diffRowCounts = make(map[string]int64) + } + m.diffRowCounts[pairKey]++ + if m.diffRowCounts[pairKey] >= m.MaxDiffRows { + m.DiffResult.Summary.DiffRowLimitReached = true + if !m.diffLimitWarned { + m.diffLimitWarned = true + logger.Warn("mtree table-diff: %s reached max_diff_rows limit (%d); enumeration for this pair stops (other pairs continue)", pairKey, m.MaxDiffRows) + } + return true + } + return false +} + +// appendDiffs records the differences between two fetched row batches, stopping +// once the node pair reaches MaxDiffRows. func (m *MerkleTreeTask) appendDiffs(nodePairKey string, work CompareRangesWorkItem, pr1, pr2 []types.OrderedMap) error { diffResult, err := utils.CompareRowSets(pr1, pr2, m.Key, m.Cols) if err != nil { @@ -565,35 +627,63 @@ func (m *MerkleTreeTask) appendDiffs(nodePairKey string, work CompareRangesWorkI } var currentDiffRowsForPair int + limitReached := false + for _, row := range diffResult.Node1OnlyRows { + if m.pairLimitReachedLocked(nodePairKey) { + limitReached = true + break + } added, err := m.addRowToDiff(nodePairKey, node1Name, row) if err != nil { return err } if added { currentDiffRowsForPair++ + if m.recordPairRowLocked(nodePairKey) { + limitReached = true + break + } } } - for _, row := range diffResult.Node2OnlyRows { - added, err := m.addRowToDiff(nodePairKey, node2Name, row) - if err != nil { - return err - } - if added { - currentDiffRowsForPair++ + if !limitReached { + for _, row := range diffResult.Node2OnlyRows { + if m.pairLimitReachedLocked(nodePairKey) { + limitReached = true + break + } + added, err := m.addRowToDiff(nodePairKey, node2Name, row) + if err != nil { + return err + } + if added { + currentDiffRowsForPair++ + if m.recordPairRowLocked(nodePairKey) { + limitReached = true + break + } + } } } - for _, modRow := range diffResult.ModifiedRows { - added1, err := m.addRowToDiff(nodePairKey, node1Name, modRow.Node1Data) - if err != nil { - return err - } - added2, err := m.addRowToDiff(nodePairKey, node2Name, modRow.Node2Data) - if err != nil { - return err - } - if added1 || added2 { - currentDiffRowsForPair++ + if !limitReached { + for _, modRow := range diffResult.ModifiedRows { + if m.pairLimitReachedLocked(nodePairKey) { + break + } + added1, err := m.addRowToDiff(nodePairKey, node1Name, modRow.Node1Data) + if err != nil { + return err + } + added2, err := m.addRowToDiff(nodePairKey, node2Name, modRow.Node2Data) + if err != nil { + return err + } + if added1 || added2 { + currentDiffRowsForPair++ + if m.recordPairRowLocked(nodePairKey) { + break + } + } } } @@ -2034,6 +2124,12 @@ func (m *MerkleTreeTask) DiffMtree() (err error) { m.finishLifecycle(recorder, start, err, resultCtx) }() + // Reject a negative cap up front so a bad value can't silently disable the + // row bound (mirrors table_diff's validation). + if m.MaxDiffRows < 0 { + return fmt.Errorf("max_diff_rows must be >= 0, got %d", m.MaxDiffRows) + } + if err = m.UpdateMtree(true); err != nil { return fmt.Errorf("failed to update merkle tree before diff: %w", err) } @@ -2054,12 +2150,24 @@ func (m *MerkleTreeTask) DiffMtree() (err error) { }) m.StartTime = time.Now() + // Resolve the differing-row cap from config when the caller left it unset, + // mirroring the classic table-diff engine. Without a bound a heavily + // diverged table collects every differing row in memory and OOMs the + // process. + if m.MaxDiffRows == 0 { + if cfg := config.Get(); cfg != nil { + m.MaxDiffRows = cfg.TableDiff.MaxDiffRows + } + } + m.diffRowCounts = make(map[string]int64) + m.diffLimitWarned = false m.DiffResult = types.DiffOutput{ NodeDiffs: make(map[string]types.DiffByNodePair), Summary: types.DiffSummary{ Schema: m.Schema, Table: m.Table, Nodes: m.NodeList, + MaxDiffRows: m.MaxDiffRows, StartTime: time.Now().Format(time.RFC3339), DiffRowsCount: make(map[string]int), CDCSkippedNodes: m.CDCSkippedNodes, diff --git a/internal/consistency/mtree/merkle_test.go b/internal/consistency/mtree/merkle_test.go index ed0310b..5cd1b46 100644 --- a/internal/consistency/mtree/merkle_test.go +++ b/internal/consistency/mtree/merkle_test.go @@ -1,10 +1,90 @@ package mtree import ( + "context" + "fmt" "strings" "testing" + + "github.com/pgedge/ace/pkg/types" ) +// mtreeTaskWithDiffState returns a task wired just enough to exercise the diff +// accumulator (appendDiffs) without a database. +func mtreeTaskWithDiffState(maxDiffRows int64) *MerkleTreeTask { + m := &MerkleTreeTask{MaxDiffRows: maxDiffRows} + m.Key = []string{"id"} + m.Cols = []string{"id", "val"} + m.DiffResult = types.DiffOutput{ + NodeDiffs: make(map[string]types.DiffByNodePair), + Summary: types.DiffSummary{DiffRowsCount: make(map[string]int)}, + } + m.diffRowKeySets = make(map[string]map[string]map[string]struct{}) + m.diffRowCounts = make(map[string]int64) + return m +} + +// node1OnlyRows builds n rows with unique primary keys, all absent from the peer. +func node1OnlyRows(n int) []types.OrderedMap { + rows := make([]types.OrderedMap, n) + for i := range rows { + rows[i] = types.OrderedMap{{Key: "id", Value: fmt.Sprintf("%d", i)}, {Key: "val", Value: "x"}} + } + return rows +} + +// A diverged table stops collecting at max_diff_rows and marks the report truncated. +func TestMtreeDiffEnforcesMaxDiffRows(t *testing.T) { + const cap = 5 + m := mtreeTaskWithDiffState(cap) + work := CompareRangesWorkItem{ + Node1: map[string]any{"Name": "n1"}, + Node2: map[string]any{"Name": "n2"}, + } + + if err := m.appendDiffs("n1/n2", work, node1OnlyRows(20), nil); err != nil { + t.Fatalf("appendDiffs returned error: %v", err) + } + + if got := len(m.DiffResult.NodeDiffs["n1/n2"].Rows["n1"]); got != cap { + t.Errorf("collected %d rows for n1, want the cap of %d", got, cap) + } + if !m.DiffResult.Summary.DiffRowLimitReached { + t.Errorf("expected DiffRowLimitReached=true after exceeding the cap") + } +} + +// With no cap configured, every differing row is collected and nothing is flagged truncated. +func TestMtreeDiffNoLimitCollectsAll(t *testing.T) { + m := mtreeTaskWithDiffState(0) + work := CompareRangesWorkItem{ + Node1: map[string]any{"Name": "n1"}, + Node2: map[string]any{"Name": "n2"}, + } + + if err := m.appendDiffs("n1/n2", work, node1OnlyRows(20), nil); err != nil { + t.Fatalf("appendDiffs returned error: %v", err) + } + + if got := len(m.DiffResult.NodeDiffs["n1/n2"].Rows["n1"]); got != 20 { + t.Errorf("collected %d rows for n1, want all 20", got) + } + if m.DiffResult.Summary.DiffRowLimitReached { + t.Errorf("did not expect DiffRowLimitReached with no cap configured") + } +} + +// A negative max_diff_rows is rejected before any work runs. +func TestMtreeDiffRejectsNegativeMaxDiffRows(t *testing.T) { + m := &MerkleTreeTask{MaxDiffRows: -1, SkipDBUpdate: true} + m.Ctx = context.Background() + + err := m.DiffMtree() + if err == nil || !strings.Contains(err.Error(), "max_diff_rows must be >= 0") { + t.Fatalf("expected max_diff_rows validation error, got %v", err) + } +} + func TestIsNumericColType(t *testing.T) { tests := []struct { colType string From c04ee00fc950e25eb0bd1f467d76ee0595517c72 Mon Sep 17 00:00:00 2001 From: Mason Sharp Date: Wed, 1 Jul 2026 11:58:10 -0700 Subject: [PATCH 2/2] fix(mtree): address CodeRabbit review on max_diff_rows - Flag DiffRowLimitReached only when a row is actually dropped, not on the last accepted row, so a pair with exactly max_diff_rows diffs is not falsely marked truncated. - Source the cap from mtree.diff.max_diff_rows like every other mtree tunable, and guard config resolution with > 0 so a negative value cannot silently disable the bound (matches table_diff). --- ace.yaml | 2 + docs/commands/mtree/mtree-table-diff.md | 2 +- internal/cli/default_config.yaml | 2 + internal/consistency/mtree/merkle.go | 131 ++++++++++++---------- internal/consistency/mtree/merkle_test.go | 22 ++++ pkg/config/config.go | 7 +- 6 files changed, 100 insertions(+), 66 deletions(-) diff --git a/ace.yaml b/ace.yaml index 365224a..a4572ac 100644 --- a/ace.yaml +++ b/ace.yaml @@ -43,6 +43,8 @@ mtree: min_block_size: 1000 block_size: 100000 max_block_size: 1000000 + # Max differing rows collected per node pair; 0 or absent means unbounded. + max_diff_rows: 1000000 # Example scheduler configuration: # schedule_jobs: diff --git a/docs/commands/mtree/mtree-table-diff.md b/docs/commands/mtree/mtree-table-diff.md index 4f85cf0..1cbe7b4 100644 --- a/docs/commands/mtree/mtree-table-diff.md +++ b/docs/commands/mtree/mtree-table-diff.md @@ -46,7 +46,7 @@ holding the node's slot, then re-run, if you need a guaranteed-current drain. - With `--output html`, both JSON and HTML reports are generated with matching timestamps. - The number of differing rows collected per node pair is bounded by - `table_diff.max_diff_rows` (the shipped `ace.yaml` sets `1000000`; if the key + `mtree.diff.max_diff_rows` (the shipped `ace.yaml` sets `1000000`; if the key is absent or `0`, the diff is unbounded). When the cap is reached, enumeration for that pair stops, the report's `diff_row_limit_reached` flag is set, and a warning is logged. This keeps a heavily diverged table from exhausting memory; diff --git a/internal/cli/default_config.yaml b/internal/cli/default_config.yaml index 7b67c73..f792a4c 100644 --- a/internal/cli/default_config.yaml +++ b/internal/cli/default_config.yaml @@ -42,6 +42,8 @@ mtree: min_block_size: 1000 block_size: 100000 max_block_size: 1000000 + # Max differing rows collected per node pair; 0 or absent means unbounded. + max_diff_rows: 1000000 # Example scheduler configuration: # schedule_jobs: diff --git a/internal/consistency/mtree/merkle.go b/internal/consistency/mtree/merkle.go index b5104b9..ee1fc9e 100644 --- a/internal/consistency/mtree/merkle.go +++ b/internal/consistency/mtree/merkle.go @@ -99,7 +99,7 @@ type MerkleTreeTask struct { mtreeSchema string // MaxDiffRows bounds how many differing rows are collected per node pair. - // 0 (the default) is resolved from config.TableDiff.MaxDiffRows in + // 0 (the default) is resolved from config.MTree.Diff.MaxDiffRows in // DiffMtree; a non-positive value after that means unlimited. Without it a // massively diverged table would accumulate every differing row in memory // and OOM the process. @@ -456,6 +456,8 @@ func (m *MerkleTreeTask) processWorkItem(work CompareRangesWorkItem, pool1, pool if isComposite { for i := 0; i < len(mismatchedComposite); i += fetchBatchSize { if m.pairCapReached(nodePairKey) { + // The remaining mismatched keys are real diffs we are skipping. + m.notePairLimitReached(nodePairKey) break } end := i + fetchBatchSize @@ -491,6 +493,8 @@ func (m *MerkleTreeTask) processWorkItem(work CompareRangesWorkItem, pool1, pool } else { for i := 0; i < len(mismatchedSimple); i += fetchBatchSize { if m.pairCapReached(nodePairKey) { + // The remaining mismatched keys are real diffs we are skipping. + m.notePairLimitReached(nodePairKey) break } end := i + fetchBatchSize @@ -579,26 +583,38 @@ func (m *MerkleTreeTask) pairCapReached(pairKey string) bool { return m.diffRowCounts[pairKey] >= m.MaxDiffRows } -// recordPairRowLocked accounts for one collected differing row-pair and reports -// whether the pair has now reached MaxDiffRows. On first reaching the cap it -// flags the report as truncated and warns once. The caller must hold diffMutex. -func (m *MerkleTreeTask) recordPairRowLocked(pairKey string) bool { +// recordPairRowLocked accounts for one collected differing row-pair. The caller +// must hold diffMutex. It does not flag truncation: reaching MaxDiffRows on an +// accepted row does not mean anything was dropped (the pair may have exactly +// MaxDiffRows diffs). Truncation is flagged only when a further row is actually +// refused, via notePairLimitReachedLocked at the stop checks. +func (m *MerkleTreeTask) recordPairRowLocked(pairKey string) { if m.MaxDiffRows <= 0 { - return false + return } if m.diffRowCounts == nil { m.diffRowCounts = make(map[string]int64) } m.diffRowCounts[pairKey]++ - if m.diffRowCounts[pairKey] >= m.MaxDiffRows { - m.DiffResult.Summary.DiffRowLimitReached = true - if !m.diffLimitWarned { - m.diffLimitWarned = true - logger.Warn("mtree table-diff: %s reached max_diff_rows limit (%d); enumeration for this pair stops (other pairs continue)", pairKey, m.MaxDiffRows) - } - return true +} + +// notePairLimitReachedLocked marks the report truncated and warns once. Call it +// only when a differing row is actually being dropped because the pair is +// already at MaxDiffRows. The caller must hold diffMutex. +func (m *MerkleTreeTask) notePairLimitReachedLocked(pairKey string) { + m.DiffResult.Summary.DiffRowLimitReached = true + if !m.diffLimitWarned { + m.diffLimitWarned = true + logger.Warn("mtree table-diff: %s reached max_diff_rows limit (%d); enumeration for this pair stops (other pairs continue)", pairKey, m.MaxDiffRows) } - return false +} + +// notePairLimitReached is the lock-safe counterpart of notePairLimitReachedLocked, +// for callers (e.g. the fetch loop) that do not already hold diffMutex. +func (m *MerkleTreeTask) notePairLimitReached(pairKey string) { + m.diffMutex.Lock() + defer m.diffMutex.Unlock() + m.notePairLimitReachedLocked(pairKey) } // appendDiffs records the differences between two fetched row batches, stopping @@ -626,12 +642,14 @@ func (m *MerkleTreeTask) appendDiffs(nodePairKey string, work CompareRangesWorkI m.DiffResult.NodeDiffs[nodePairKey].Rows[node2Name] = []types.OrderedMap{} } + // Each loop stops (and flags truncation) at its top check the moment it is + // asked to record a row while the pair is already full, so hitting the cap + // on the last accepted row does not spuriously mark the report truncated. var currentDiffRowsForPair int - limitReached := false for _, row := range diffResult.Node1OnlyRows { if m.pairLimitReachedLocked(nodePairKey) { - limitReached = true + m.notePairLimitReachedLocked(nodePairKey) break } added, err := m.addRowToDiff(nodePairKey, node1Name, row) @@ -640,50 +658,39 @@ func (m *MerkleTreeTask) appendDiffs(nodePairKey string, work CompareRangesWorkI } if added { currentDiffRowsForPair++ - if m.recordPairRowLocked(nodePairKey) { - limitReached = true - break - } + m.recordPairRowLocked(nodePairKey) } } - if !limitReached { - for _, row := range diffResult.Node2OnlyRows { - if m.pairLimitReachedLocked(nodePairKey) { - limitReached = true - break - } - added, err := m.addRowToDiff(nodePairKey, node2Name, row) - if err != nil { - return err - } - if added { - currentDiffRowsForPair++ - if m.recordPairRowLocked(nodePairKey) { - limitReached = true - break - } - } + for _, row := range diffResult.Node2OnlyRows { + if m.pairLimitReachedLocked(nodePairKey) { + m.notePairLimitReachedLocked(nodePairKey) + break + } + added, err := m.addRowToDiff(nodePairKey, node2Name, row) + if err != nil { + return err + } + if added { + currentDiffRowsForPair++ + m.recordPairRowLocked(nodePairKey) } } - if !limitReached { - for _, modRow := range diffResult.ModifiedRows { - if m.pairLimitReachedLocked(nodePairKey) { - break - } - added1, err := m.addRowToDiff(nodePairKey, node1Name, modRow.Node1Data) - if err != nil { - return err - } - added2, err := m.addRowToDiff(nodePairKey, node2Name, modRow.Node2Data) - if err != nil { - return err - } - if added1 || added2 { - currentDiffRowsForPair++ - if m.recordPairRowLocked(nodePairKey) { - break - } - } + for _, modRow := range diffResult.ModifiedRows { + if m.pairLimitReachedLocked(nodePairKey) { + m.notePairLimitReachedLocked(nodePairKey) + break + } + added1, err := m.addRowToDiff(nodePairKey, node1Name, modRow.Node1Data) + if err != nil { + return err + } + added2, err := m.addRowToDiff(nodePairKey, node2Name, modRow.Node2Data) + if err != nil { + return err + } + if added1 || added2 { + currentDiffRowsForPair++ + m.recordPairRowLocked(nodePairKey) } } @@ -2150,13 +2157,13 @@ func (m *MerkleTreeTask) DiffMtree() (err error) { }) m.StartTime = time.Now() - // Resolve the differing-row cap from config when the caller left it unset, - // mirroring the classic table-diff engine. Without a bound a heavily - // diverged table collects every differing row in memory and OOMs the - // process. + // Resolve the differing-row cap from the mtree config when the caller left + // it unset, mirroring how every other mtree tunable is sourced from the + // mtree section. Without a bound a heavily diverged table collects every + // differing row in memory and OOMs the process. if m.MaxDiffRows == 0 { - if cfg := config.Get(); cfg != nil { - m.MaxDiffRows = cfg.TableDiff.MaxDiffRows + if cfg := config.Get(); cfg != nil && cfg.MTree.Diff.MaxDiffRows > 0 { + m.MaxDiffRows = cfg.MTree.Diff.MaxDiffRows } } m.diffRowCounts = make(map[string]int64) diff --git a/internal/consistency/mtree/merkle_test.go b/internal/consistency/mtree/merkle_test.go index 5cd1b46..deace82 100644 --- a/internal/consistency/mtree/merkle_test.go +++ b/internal/consistency/mtree/merkle_test.go @@ -54,6 +54,28 @@ func TestMtreeDiffEnforcesMaxDiffRows(t *testing.T) { } } +// A pair with exactly max_diff_rows diffs collects them all and is NOT marked +// truncated: reaching the cap on the last row does not mean anything was dropped. +func TestMtreeDiffExactCapNotTruncated(t *testing.T) { + const cap = 5 + m := mtreeTaskWithDiffState(cap) + work := CompareRangesWorkItem{ + Node1: map[string]any{"Name": "n1"}, + Node2: map[string]any{"Name": "n2"}, + } + + if err := m.appendDiffs("n1/n2", work, node1OnlyRows(cap), nil); err != nil { + t.Fatalf("appendDiffs returned error: %v", err) + } + + if got := len(m.DiffResult.NodeDiffs["n1/n2"].Rows["n1"]); got != cap { + t.Errorf("collected %d rows for n1, want all %d", got, cap) + } + if m.DiffResult.Summary.DiffRowLimitReached { + t.Errorf("did not expect DiffRowLimitReached when diffs exactly equal the cap") + } +} + // With no cap configured, every differing row is collected and nothing is flagged truncated. func TestMtreeDiffNoLimitCollectsAll(t *testing.T) { m := mtreeTaskWithDiffState(0) diff --git a/pkg/config/config.go b/pkg/config/config.go index 74be4f7..6911d4b 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -67,9 +67,10 @@ type MTreeConfig struct { } `yaml:"cdc"` Schema string `yaml:"schema"` Diff struct { - MinBlockSize int `yaml:"min_block_size"` - BlockSize int `yaml:"block_size"` - MaxBlockSize int `yaml:"max_block_size"` + MinBlockSize int `yaml:"min_block_size"` + BlockSize int `yaml:"block_size"` + MaxBlockSize int `yaml:"max_block_size"` + MaxDiffRows int64 `yaml:"max_diff_rows"` } `yaml:"diff"` }