From 48abdaf50e8581f25a2117c5f1cd992f3ca6c051 Mon Sep 17 00:00:00 2001 From: Mason Sharp Date: Tue, 30 Jun 2026 17:30:33 -0700 Subject: [PATCH 1/2] fix(mtree): allow table-diff/update to run while mtree listen is active (ACE-199) Previously `mtree table-diff` and `mtree update` failed with SQLSTATE 55006 when `mtree listen` was running: both consumed the same logical replication slot, and PostgreSQL allows only one active consumer per slot. Now, when a node's slot is already held by a running listener, the bounded CDC drain returns a sentinel error and UpdateMtree skips that node's drain (best-effort), comparing against the tree listen keeps warm, instead of aborting. Nodes without an active listener still get a full, guaranteed drain. - cdc: add ErrSlotBusy sentinel + 55006 classifier; the bounded drain detects a busy slot via an up-front active-pid check plus a StartReplication backstop - queries: extract a reusable GetActiveSlotPID helper - mtree: skip the CDC drain per-node on a busy slot with a warning, and surface cdc_skipped_nodes on both the update and table-diff paths - tests: integration test asserting diff/update succeed (and that the skip actually occurs) while listen holds the slot - docs: document the concurrent behavior and best-effort freshness caveat --- db/queries/queries.go | 18 ++++- docs/commands/mtree/index.md | 8 +++ docs/commands/mtree/mtree-listen.md | 6 ++ docs/commands/mtree/mtree-table-diff.md | 6 ++ internal/consistency/mtree/merkle.go | 30 ++++++++- internal/infra/cdc/listen.go | 35 ++++++++++ internal/infra/cdc/listen_test.go | 61 +++++++++++++++++ tests/integration/merkle_tree_test.go | 89 +++++++++++++++++++++++++ 8 files changed, 249 insertions(+), 4 deletions(-) create mode 100644 internal/infra/cdc/listen_test.go diff --git a/db/queries/queries.go b/db/queries/queries.go index 8f9bd71..3b20263 100644 --- a/db/queries/queries.go +++ b/db/queries/queries.go @@ -2838,16 +2838,28 @@ func DropPublication(ctx context.Context, db DBQuerier, publicationName string) return nil } -func DropReplicationSlot(ctx context.Context, db DBQuerier, slotName string) error { +// GetActiveSlotPID returns the PID of the active consumer holding the named +// logical replication slot, or nil if the slot exists but is not currently +// active (or does not exist). Used to detect a running `mtree listen` before a +// bounded CDC drain attempts to attach to the same slot. +func GetActiveSlotPID(ctx context.Context, db DBQuerier, slotName string) (*int32, error) { var pid *int32 pidSQL, err := RenderSQL(SQLTemplates.GetReplicationSlotPID, nil) if err != nil { - return fmt.Errorf("failed to render GetReplicationSlotPID SQL: %w", err) + return nil, fmt.Errorf("failed to render GetReplicationSlotPID SQL: %w", err) } err = db.QueryRow(ctx, pidSQL, slotName).Scan(&pid) if err != nil && err != pgx.ErrNoRows { - return fmt.Errorf("query to get replication slot PID failed for slot %s: %w", slotName, err) + return nil, fmt.Errorf("query to get replication slot PID failed for slot %s: %w", slotName, err) + } + return pid, nil +} + +func DropReplicationSlot(ctx context.Context, db DBQuerier, slotName string) error { + pid, err := GetActiveSlotPID(ctx, db, slotName) + if err != nil { + return err } if pid != nil { diff --git a/docs/commands/mtree/index.md b/docs/commands/mtree/index.md index 6547f6f..48676e2 100644 --- a/docs/commands/mtree/index.md +++ b/docs/commands/mtree/index.md @@ -47,6 +47,14 @@ Finally, you can use the diff file to initiate table repair with the ACE [table- Running `mtree listen` can help keep trees current; every `mtree table-diff` also performs an on-demand update before comparing. +`mtree listen` and `mtree table-diff` / `mtree update` can run at the same time. +When a diff or update runs while `listen` holds a node's replication slot, ACE +skips that node's CDC catch-up and compares against the tree `listen` is already +maintaining. A warning is printed, and the result reflects `listen`'s +last-applied state — it may omit changes newer than `listen`'s most recent +apply. For a diff that is guaranteed current to the present moment, stop +`mtree listen` first so the diff can perform its own bounded CDC drain. + ### Building Merkle Trees in Parallel (for Very Large Tables) If a table is extremely large (e.g., ~1B rows or ~1 TB), remote building the Merkle tree from a single ACE node can be slowed by network latency. You can parallelize the build (per node) to speed up the process. diff --git a/docs/commands/mtree/mtree-listen.md b/docs/commands/mtree/mtree-listen.md index 543b5b7..ae17b50 100644 --- a/docs/commands/mtree/mtree-listen.md +++ b/docs/commands/mtree/mtree-listen.md @@ -25,3 +25,9 @@ Starts a long-running process to listen for changes and auto-update Merkle trees ```sh ./ace mtree listen --dbname=mydatabase my-cluster ``` + +You no longer need to stop `mtree listen` before running `mtree table-diff` or +`mtree update`. While `listen` is active, those commands skip their own CDC +drain for the affected node and compare against the tree `listen` maintains +(best-effort freshness). Stop `listen` first only when you need a diff that is +guaranteed current to the present instant. diff --git a/docs/commands/mtree/mtree-table-diff.md b/docs/commands/mtree/mtree-table-diff.md index 694d664..1b2ceaa 100644 --- a/docs/commands/mtree/mtree-table-diff.md +++ b/docs/commands/mtree/mtree-table-diff.md @@ -3,6 +3,12 @@ Compares Merkle trees of a table across nodes to find inconsistencies. By default, updates trees first using CDC. +If `mtree listen` is running and holds the replication slot, `table-diff` skips +the CDC drain for that node (printing a warning) and compares against the +listen-maintained tree instead of failing. The comparison reflects `listen`'s +last-applied state. Stop `mtree listen` first if you need a guaranteed-current +drain. + **Usage** ``` diff --git a/internal/consistency/mtree/merkle.go b/internal/consistency/mtree/merkle.go index a7bb061..c53fab1 100644 --- a/internal/consistency/mtree/merkle.go +++ b/internal/consistency/mtree/merkle.go @@ -86,6 +86,8 @@ type MerkleTreeTask struct { Mode string NoCDC bool CDCTimeoutSec int // per-invocation CDC drain budget; 0 = use config/default + // CDCSkippedNodes lists nodes whose CDC drain was skipped because a running `mtree listen` held the replication slot (best-effort mode). + CDCSkippedNodes []string SkipDBUpdate bool Until string @@ -1615,12 +1617,25 @@ func (m *MerkleTreeTask) UpdateMtree(skipAllChecks bool) (err error) { //nolint: // (in node order) is returned, preserving the prior semantics. var wg sync.WaitGroup cdcErrs := make([]error, len(m.ClusterNodes)) + skipped := make([]string, len(m.ClusterNodes)) for i, nodeInfo := range m.ClusterNodes { wg.Add(1) go func(i int, nodeInfo map[string]any) { defer wg.Done() if err := cdc.UpdateFromCDC(ctx, nodeInfo); err != nil { - cdcErrs[i] = fmt.Errorf("CDC update failed for node %s: %w", nodeInfo["Name"], err) + name := fmt.Sprintf("%v", nodeInfo["Name"]) + // A busy slot means `mtree listen` is draining this node. Skip + // its CDC catch-up (best-effort) and compare against the tree + // listen is keeping warm, rather than failing the whole run. + if errors.Is(err, cdc.ErrSlotBusy) { + logger.Warn("node %s: 'mtree listen' is active on the replication slot; "+ + "skipping CDC drain and comparing against the listen-maintained tree. "+ + "Result may omit changes newer than listen's last apply. For a "+ + "guaranteed-current diff, stop 'mtree listen' first. (%v)", name, err) + skipped[i] = name + return + } + cdcErrs[i] = fmt.Errorf("CDC update failed for node %s: %w", name, err) } }(i, nodeInfo) } @@ -1630,6 +1645,16 @@ func (m *MerkleTreeTask) UpdateMtree(skipAllChecks bool) (err error) { //nolint: return e } } + var skippedNodes []string + for _, n := range skipped { + if n != "" { + skippedNodes = append(skippedNodes, n) + } + } + if len(skippedNodes) > 0 { + resultCtx["cdc_skipped_nodes"] = skippedNodes + m.CDCSkippedNodes = skippedNodes + } } var blockSize int @@ -2008,6 +2033,9 @@ func (m *MerkleTreeTask) DiffMtree() (err error) { if err = m.UpdateMtree(true); err != nil { return fmt.Errorf("failed to update merkle tree before diff: %w", err) } + if len(m.CDCSkippedNodes) > 0 { + resultCtx["cdc_skipped_nodes"] = m.CDCSkippedNodes + } if err := m.loadNodeOriginNames(); err != nil { logger.Warn("mtree diff: unable to load node origin names; using raw node_origin values: %v", err) } diff --git a/internal/infra/cdc/listen.go b/internal/infra/cdc/listen.go index 41a9779..f2f881f 100644 --- a/internal/infra/cdc/listen.go +++ b/internal/infra/cdc/listen.go @@ -13,6 +13,7 @@ package cdc import ( "context" + "errors" "fmt" "strings" "sync" @@ -29,6 +30,21 @@ import ( "github.com/pgedge/ace/pkg/logger" ) +// ErrSlotBusy signals that a bounded CDC drain could not run because the +// node's logical replication slot is already held by another active consumer +// (typically a running `mtree listen`). Bounded callers (UpdateMtree) treat +// this as a per-node skip rather than a fatal error; the continuous listener +// never returns it as terminal. +var ErrSlotBusy = errors.New("replication slot is held by another consumer") + +// isSlotBusyErr reports whether err wraps a PostgreSQL "object in use" error +// (SQLSTATE 55006), which is what StartReplication returns when the slot is +// already active for another PID. +func isSlotBusyErr(err error) bool { + var pgErr *pgconn.PgError + return errors.As(err, &pgErr) && pgErr.Code == "55006" +} + // Since we're using the native pgoutput plugin, there are some wire protocol // specifics that need to be used for parsing the messages. pglogrepl helps // abstract away a lot of the low-level details, but wherever raw codes, flags, OIDs, @@ -210,12 +226,31 @@ func processReplicationStream(ctx context.Context, nodeInfo map[string]any, cont if err := pglogrepl.StartReplication(ctx, c, slotName, lastLSN, opts); err != nil { c.Close(context.Background()) + // If listen acquired the slot in the race window after the up-front + // check, surface it as ErrSlotBusy so a bounded caller skips rather + // than fails. (Continuous mode just retries via its reconnect loop.) + if isSlotBusyErr(err) { + return nil, fmt.Errorf("%w: %v", ErrSlotBusy, err) + } return nil, fmt.Errorf("StartReplication failed: %w", err) } logger.Info("Logical replication started on slot %s from LSN %s", slotName, lastLSN) return c, nil } + // Best-effort cooperation with a running `mtree listen`: a bounded drain + // cannot attach to a slot that listen already holds (PostgreSQL allows one + // active consumer per slot). Detect that up front and return ErrSlotBusy so + // the caller can skip this node's drain instead of failing. Continuous mode + // is the legitimate slot owner and is exempt. + if !continuous { + if pid, perr := queries.GetActiveSlotPID(ctx, pool, slotName); perr != nil { + return fmt.Errorf("failed to check replication slot status: %w", perr) + } else if pid != nil { + return fmt.Errorf("%w (slot %s, pid %d)", ErrSlotBusy, slotName, *pid) + } + } + conn, err = connect() if err != nil { logger.Error("initial connection failed: %v", err) diff --git a/internal/infra/cdc/listen_test.go b/internal/infra/cdc/listen_test.go new file mode 100644 index 0000000..45a9818 --- /dev/null +++ b/internal/infra/cdc/listen_test.go @@ -0,0 +1,61 @@ +// /////////////////////////////////////////////////////////////////////////// +// +// # ACE - Active Consistency Engine +// +// Copyright (C) 2023 - 2026, pgEdge (https://www.pgedge.com/) +// +// This software is released under the PostgreSQL License: +// https://opensource.org/license/postgresql +// +// /////////////////////////////////////////////////////////////////////////// + +package cdc + +import ( + "errors" + "fmt" + "testing" + + "github.com/jackc/pgx/v5/pgconn" +) + +func TestIsSlotBusyErr(t *testing.T) { + tests := []struct { + name string + err error + want bool + }{ + { + name: "direct 55006 PgError", + err: &pgconn.PgError{Code: "55006", Message: "replication slot is active"}, + want: true, + }, + { + name: "wrapped 55006 PgError", + err: fmt.Errorf("StartReplication failed: %w", &pgconn.PgError{Code: "55006"}), + want: true, + }, + { + name: "different SQLSTATE", + err: &pgconn.PgError{Code: "42704"}, + want: false, + }, + { + name: "plain error", + err: errors.New("some other failure"), + want: false, + }, + { + name: "nil error", + err: nil, + want: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := isSlotBusyErr(tt.err); got != tt.want { + t.Errorf("isSlotBusyErr() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/tests/integration/merkle_tree_test.go b/tests/integration/merkle_tree_test.go index 5d5ab6d..bb50d16 100644 --- a/tests/integration/merkle_tree_test.go +++ b/tests/integration/merkle_tree_test.go @@ -217,6 +217,9 @@ func runMerkleTreeTests(t *testing.T, env *testEnv, tableName string) { t.Run("ContinuousCDC", func(t *testing.T) { testMerkleTreeContinuousCDC(t, env, tableName) }) + t.Run("DiffWhileListenActive", func(t *testing.T) { + testMerkleTreeDiffWhileListenActive(t, env, tableName) + }) } t.Run("Teardown", func(t *testing.T) { testMerkleTreeTeardown(t, env, tableName) @@ -733,6 +736,92 @@ func testMerkleTreeContinuousCDC(t *testing.T, env *testEnv, tableName string) { require.NoError(t, err, "ExecuteTask should succeed") } +// testMerkleTreeDiffWhileListenActive proves that table-diff and update no +// longer fail with a "replication slot is active" (SQLSTATE 55006) error +// when `mtree listen` is concurrently holding the CDC replication slot. +// Earlier work on this branch made the bounded CDC drain return +// cdc.ErrSlotBusy and made UpdateMtree skip that node's drain with a +// warning instead of failing; this test confirms the end-to-end fix. +func testMerkleTreeDiffWhileListenActive(t *testing.T, env *testEnv, tableName string) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + qualifiedTableName := fmt.Sprintf("%s.%s", testSchema, tableName) + // Mirrors testMerkleTreeContinuousCDC: the CDC listener and merkle tree + // work run only on n1, so we don't need n2 in the task's node list. + nodes := []string{env.ServiceN1} + mtreeTask := env.newMerkleTreeTask(t, qualifiedTableName, nodes) + + err := mtreeTask.RunChecks(false) + require.NoError(t, err, "RunChecks should succeed") + err = mtreeTask.MtreeInit() + require.NoError(t, err, "MtreeInit should succeed") + t.Cleanup(func() { + err := mtreeTask.MtreeTeardown() + if err != nil { + t.Logf("Warning: MtreeTeardown failed during cleanup: %v", err) + } + env.repairTable(t, qualifiedTableName, env.ServiceN2) + files, _ := filepath.Glob("*_diffs-*.json") + for _, f := range files { + os.Remove(f) + } + }) + + err = mtreeTask.BuildMtree() + require.NoError(t, err, "BuildMtree should succeed") + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + nodeInfo := env.ClusterNodes[0] + cdc.ListenForChanges(ctx, nodeInfo) + }() + + time.Sleep(3 * time.Second) // let the listener attach to the slot + + pool := env.N1Pool + tx, err := pool.Begin(ctx) + require.NoError(t, err) + defer tx.Rollback(ctx) + + env.withRepairModeTx(t, ctx, tx, func() { + if mtreeTask.SimplePrimaryKey { + updateSQL := fmt.Sprintf("UPDATE %s SET email = 'diff.while.listen.active@example.com' WHERE index = 1", qualifiedTableName) + _, err = tx.Exec(ctx, updateSQL) + require.NoError(t, err) + } else { + var customerID string + err := pool.QueryRow(ctx, fmt.Sprintf("SELECT customer_id FROM %s WHERE index = 1 LIMIT 1", qualifiedTableName)).Scan(&customerID) + require.NoError(t, err, "could not get customer_id for index 1") + updateSQL := fmt.Sprintf("UPDATE %s SET email = 'diff.while.listen.active@example.com' WHERE index = 1 AND customer_id = $1", qualifiedTableName) + _, err = tx.Exec(ctx, updateSQL, customerID) + require.NoError(t, err) + } + }) + require.NoError(t, tx.Commit(ctx)) + + time.Sleep(2 * time.Second) // give listen time to mark the block dirty + + // The core assertion: table-diff must NOT fail with a slot-active error + // while listen holds the slot. + err = mtreeTask.DiffMtree() + require.NoError(t, err, "table-diff should succeed while listen is active") + + // update must likewise not fail with a slot-active error. + require.NoError(t, mtreeTask.UpdateMtree(true), + "update should succeed while listen is active") + + // Prove the busy-slot SKIP path actually executed, rather than merely + // not erroring (e.g. because the slot happened to be free). + require.NotEmpty(t, mtreeTask.CDCSkippedNodes, + "expected the node's CDC drain to be skipped while listen holds the slot") + + cancel() + wg.Wait() +} + type mergeCase string const ( From 3b006d612a9fd8bf0a4c69533f535decb18d2354 Mon Sep 17 00:00:00 2001 From: Mason Sharp Date: Tue, 30 Jun 2026 17:56:44 -0700 Subject: [PATCH 2/2] fix(mtree): surface best-effort CDC skip and stop over-claiming listen A busy slot can be held by any consumer (including a concurrent bounded mtree op sharing the node's slot), not only mtree listen. Make the skip visible instead of implying a guaranteed comparison: - add cdc_skipped_nodes to DiffSummary so it appears in the diff report - warn at end of diff when nodes were skipped (fires even if trees match) - reword the per-node warning to not assert the holder is listen and to note divergence can be under-reported - docs: shared per-node slot, concurrent-op trigger, best-effort caveat --- docs/commands/mtree/index.md | 17 +++++++---- docs/commands/mtree/mtree-table-diff.md | 14 +++++---- internal/consistency/mtree/merkle.go | 38 +++++++++++++++++-------- pkg/types/types.go | 6 ++++ 4 files changed, 52 insertions(+), 23 deletions(-) diff --git a/docs/commands/mtree/index.md b/docs/commands/mtree/index.md index 48676e2..b2fad00 100644 --- a/docs/commands/mtree/index.md +++ b/docs/commands/mtree/index.md @@ -48,12 +48,17 @@ Finally, you can use the diff file to initiate table repair with the ACE [table- Running `mtree listen` can help keep trees current; every `mtree table-diff` also performs an on-demand update before comparing. `mtree listen` and `mtree table-diff` / `mtree update` can run at the same time. -When a diff or update runs while `listen` holds a node's replication slot, ACE -skips that node's CDC catch-up and compares against the tree `listen` is already -maintaining. A warning is printed, and the result reflects `listen`'s -last-applied state — it may omit changes newer than `listen`'s most recent -apply. For a diff that is guaranteed current to the present moment, stop -`mtree listen` first so the diff can perform its own bounded CDC drain. +Each node has a single replication slot shared across all of its Merkle trees, +and PostgreSQL allows only one active consumer per slot. So whenever a diff or +update finds a node's slot already held by another consumer — normally a running +`mtree listen`, but also a concurrent `mtree table-diff`/`update` on the same +node (even for a different table) — ACE skips that node's CDC catch-up and +compares against the tree already maintained on that node. A warning is printed, +the skipped nodes are listed in the diff summary (`cdc_skipped_nodes`), and the +result is best-effort: it may omit changes newer than the last apply, so +divergence can be under-reported. For a diff that is guaranteed current to the +present moment, ensure no `mtree listen` or other mtree operation is holding the +node's slot, then re-run so the diff can perform its own bounded CDC drain. ### Building Merkle Trees in Parallel (for Very Large Tables) diff --git a/docs/commands/mtree/mtree-table-diff.md b/docs/commands/mtree/mtree-table-diff.md index 1b2ceaa..5e06170 100644 --- a/docs/commands/mtree/mtree-table-diff.md +++ b/docs/commands/mtree/mtree-table-diff.md @@ -3,11 +3,15 @@ Compares Merkle trees of a table across nodes to find inconsistencies. By default, updates trees first using CDC. -If `mtree listen` is running and holds the replication slot, `table-diff` skips -the CDC drain for that node (printing a warning) and compares against the -listen-maintained tree instead of failing. The comparison reflects `listen`'s -last-applied state. Stop `mtree listen` first if you need a guaranteed-current -drain. +If a node's replication slot is already held by another consumer — normally a +running `mtree listen`, but also a concurrent `table-diff`/`update` on the same +node (the slot is shared across all of a node's Merkle trees) — `table-diff` +skips the CDC drain for that node (printing a warning) and compares against the +already-maintained tree instead of failing. The comparison is best-effort and +may omit the most recent changes on those nodes, so divergence can be +under-reported; the skipped nodes are listed in the diff summary +(`cdc_skipped_nodes`). Ensure no `mtree listen` or other mtree operation is +holding the node's slot, then re-run, if you need a guaranteed-current drain. **Usage** diff --git a/internal/consistency/mtree/merkle.go b/internal/consistency/mtree/merkle.go index c53fab1..3a260a1 100644 --- a/internal/consistency/mtree/merkle.go +++ b/internal/consistency/mtree/merkle.go @@ -1624,14 +1624,18 @@ func (m *MerkleTreeTask) UpdateMtree(skipAllChecks bool) (err error) { //nolint: defer wg.Done() if err := cdc.UpdateFromCDC(ctx, nodeInfo); err != nil { name := fmt.Sprintf("%v", nodeInfo["Name"]) - // A busy slot means `mtree listen` is draining this node. Skip - // its CDC catch-up (best-effort) and compare against the tree - // listen is keeping warm, rather than failing the whole run. + // The slot is held by another active consumer -- typically a + // running `mtree listen`, but possibly a concurrent bounded mtree + // operation sharing this node's slot. Skip this node's CDC + // catch-up (best-effort) and compare against the already-maintained + // tree rather than failing the whole run. if errors.Is(err, cdc.ErrSlotBusy) { - logger.Warn("node %s: 'mtree listen' is active on the replication slot; "+ - "skipping CDC drain and comparing against the listen-maintained tree. "+ - "Result may omit changes newer than listen's last apply. For a "+ - "guaranteed-current diff, stop 'mtree listen' first. (%v)", name, err) + logger.Warn("node %s: replication slot is held by another consumer "+ + "(expected: 'mtree listen'); skipping this node's CDC drain and comparing "+ + "against the already-maintained tree (best-effort). Recent changes may be "+ + "omitted, so divergence can be under-reported. For a guaranteed-current "+ + "diff, ensure no 'mtree listen' or other mtree operation is holding the "+ + "slot, then re-run. (%v)", name, err) skipped[i] = name return } @@ -2053,11 +2057,12 @@ func (m *MerkleTreeTask) DiffMtree() (err error) { m.DiffResult = types.DiffOutput{ NodeDiffs: make(map[string]types.DiffByNodePair), Summary: types.DiffSummary{ - Schema: m.Schema, - Table: m.Table, - Nodes: m.NodeList, - StartTime: time.Now().Format(time.RFC3339), - DiffRowsCount: make(map[string]int), + Schema: m.Schema, + Table: m.Table, + Nodes: m.NodeList, + StartTime: time.Now().Format(time.RFC3339), + DiffRowsCount: make(map[string]int), + CDCSkippedNodes: m.CDCSkippedNodes, }, } m.diffRowKeySets = make(map[string]map[string]map[string]struct{}) @@ -2167,6 +2172,15 @@ func (m *MerkleTreeTask) DiffMtree() (err error) { resultCtx["mismatched_pairs"] = len(m.DiffResult.NodeDiffs) + if len(m.CDCSkippedNodes) > 0 { + logger.Warn("diff completed in best-effort mode: the CDC drain was skipped for "+ + "node(s) %v because the replication slot was held by another consumer "+ + "(expected: 'mtree listen'). Recent changes on those node(s) may not be "+ + "reflected, so divergence can be under-reported. Re-run with no concurrent "+ + "'mtree listen' or other mtree operation holding the slot for a "+ + "guaranteed-current diff.", m.CDCSkippedNodes) + } + return nil } diff --git a/pkg/types/types.go b/pkg/types/types.go index 9889870..c68826c 100644 --- a/pkg/types/types.go +++ b/pkg/types/types.go @@ -124,6 +124,12 @@ type DiffSummary struct { AgainstOriginResolved string `json:"against_origin_resolved,omitempty"` Until string `json:"until,omitempty"` OriginOnly bool `json:"origin_only,omitempty"` + // CDCSkippedNodes lists nodes whose pre-diff CDC drain was skipped because + // the replication slot was held by another consumer (typically a running + // mtree listen, but possibly a concurrent bounded mtree operation sharing + // the node's slot). The comparison for those nodes is best-effort and may + // omit their most recent changes, so divergence can be under-reported. + CDCSkippedNodes []string `json:"cdc_skipped_nodes,omitempty"` } type KVPair struct {