Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions db/queries/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -2838,16 +2838,28 @@ func DropPublication(ctx context.Context, db DBQuerier, publicationName string)
return nil
}

func DropReplicationSlot(ctx context.Context, db DBQuerier, slotName string) error {
// GetActiveSlotPID returns the PID of the active consumer holding the named
// logical replication slot, or nil if the slot exists but is not currently
// active (or does not exist). Used to detect a running `mtree listen` before a
// bounded CDC drain attempts to attach to the same slot.
func GetActiveSlotPID(ctx context.Context, db DBQuerier, slotName string) (*int32, error) {
var pid *int32
pidSQL, err := RenderSQL(SQLTemplates.GetReplicationSlotPID, nil)
if err != nil {
return fmt.Errorf("failed to render GetReplicationSlotPID SQL: %w", err)
return nil, fmt.Errorf("failed to render GetReplicationSlotPID SQL: %w", err)
}

err = db.QueryRow(ctx, pidSQL, slotName).Scan(&pid)
if err != nil && err != pgx.ErrNoRows {
return fmt.Errorf("query to get replication slot PID failed for slot %s: %w", slotName, err)
return nil, fmt.Errorf("query to get replication slot PID failed for slot %s: %w", slotName, err)
}
return pid, nil
}

func DropReplicationSlot(ctx context.Context, db DBQuerier, slotName string) error {
pid, err := GetActiveSlotPID(ctx, db, slotName)
if err != nil {
return err
}

if pid != nil {
Expand Down
13 changes: 13 additions & 0 deletions docs/commands/mtree/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,19 @@ Finally, you can use the diff file to initiate table repair with the ACE [table-

Running `mtree listen` can help keep trees current; every `mtree table-diff` also performs an on-demand update before comparing.

`mtree listen` and `mtree table-diff` / `mtree update` can run at the same time.
Each node has a single replication slot shared across all of its Merkle trees,
and PostgreSQL allows only one active consumer per slot. So whenever a diff or
update finds a node's slot already held by another consumer — normally a running
`mtree listen`, but also a concurrent `mtree table-diff`/`update` on the same
node (even for a different table) — ACE skips that node's CDC catch-up and
compares against the tree already maintained on that node. A warning is printed,
the skipped nodes are listed in the diff summary (`cdc_skipped_nodes`), and the
result is best-effort: it may omit changes newer than the last apply, so
divergence can be under-reported. For a diff that is guaranteed current to the
present moment, ensure no `mtree listen` or other mtree operation is holding the
node's slot, then re-run so the diff can perform its own bounded CDC drain.

### Building Merkle Trees in Parallel (for Very Large Tables)

If a table is extremely large (e.g., ~1B rows or ~1 TB), remote building the Merkle tree from a single ACE node can be slowed by network latency. You can parallelize the build (per node) to speed up the process.
Expand Down
6 changes: 6 additions & 0 deletions docs/commands/mtree/mtree-listen.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,9 @@ Starts a long-running process to listen for changes and auto-update Merkle trees
```sh
./ace mtree listen --dbname=mydatabase my-cluster
```

You no longer need to stop `mtree listen` before running `mtree table-diff` or
`mtree update`. While `listen` is active, those commands skip their own CDC
drain for the affected node and compare against the tree `listen` maintains
(best-effort freshness). Stop `listen` first only when you need a diff that is
guaranteed current to the present instant.
10 changes: 10 additions & 0 deletions docs/commands/mtree/mtree-table-diff.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,16 @@
Compares Merkle trees of a table across nodes to find inconsistencies.
By default, updates trees first using CDC.

If a node's replication slot is already held by another consumer — normally a
running `mtree listen`, but also a concurrent `table-diff`/`update` on the same
node (the slot is shared across all of a node's Merkle trees) — `table-diff`
skips the CDC drain for that node (printing a warning) and compares against the
already-maintained tree instead of failing. The comparison is best-effort and
may omit the most recent changes on those nodes, so divergence can be
under-reported; the skipped nodes are listed in the diff summary
(`cdc_skipped_nodes`). Ensure no `mtree listen` or other mtree operation is
holding the node's slot, then re-run, if you need a guaranteed-current drain.

**Usage**

```
Expand Down
54 changes: 48 additions & 6 deletions internal/consistency/mtree/merkle.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ type MerkleTreeTask struct {
Mode string
NoCDC bool
CDCTimeoutSec int // per-invocation CDC drain budget; 0 = use config/default
// CDCSkippedNodes lists nodes whose CDC drain was skipped because a running `mtree listen` held the replication slot (best-effort mode).
CDCSkippedNodes []string
SkipDBUpdate bool
Until string

Expand Down Expand Up @@ -1615,12 +1617,29 @@ func (m *MerkleTreeTask) UpdateMtree(skipAllChecks bool) (err error) { //nolint:
// (in node order) is returned, preserving the prior semantics.
var wg sync.WaitGroup
cdcErrs := make([]error, len(m.ClusterNodes))
skipped := make([]string, len(m.ClusterNodes))
for i, nodeInfo := range m.ClusterNodes {
wg.Add(1)
go func(i int, nodeInfo map[string]any) {
defer wg.Done()
if err := cdc.UpdateFromCDC(ctx, nodeInfo); err != nil {
cdcErrs[i] = fmt.Errorf("CDC update failed for node %s: %w", nodeInfo["Name"], err)
name := fmt.Sprintf("%v", nodeInfo["Name"])
// The slot is held by another active consumer -- typically a
// running `mtree listen`, but possibly a concurrent bounded mtree
// operation sharing this node's slot. Skip this node's CDC
// catch-up (best-effort) and compare against the already-maintained
// tree rather than failing the whole run.
if errors.Is(err, cdc.ErrSlotBusy) {
logger.Warn("node %s: replication slot is held by another consumer "+
"(expected: 'mtree listen'); skipping this node's CDC drain and comparing "+
"against the already-maintained tree (best-effort). Recent changes may be "+
"omitted, so divergence can be under-reported. For a guaranteed-current "+
"diff, ensure no 'mtree listen' or other mtree operation is holding the "+
"slot, then re-run. (%v)", name, err)
skipped[i] = name
return
}
cdcErrs[i] = fmt.Errorf("CDC update failed for node %s: %w", name, err)
}
}(i, nodeInfo)
}
Expand All @@ -1630,6 +1649,16 @@ func (m *MerkleTreeTask) UpdateMtree(skipAllChecks bool) (err error) { //nolint:
return e
}
}
var skippedNodes []string
for _, n := range skipped {
if n != "" {
skippedNodes = append(skippedNodes, n)
}
}
if len(skippedNodes) > 0 {
resultCtx["cdc_skipped_nodes"] = skippedNodes
m.CDCSkippedNodes = skippedNodes
}
}

var blockSize int
Expand Down Expand Up @@ -2008,6 +2037,9 @@ func (m *MerkleTreeTask) DiffMtree() (err error) {
if err = m.UpdateMtree(true); err != nil {
return fmt.Errorf("failed to update merkle tree before diff: %w", err)
}
if len(m.CDCSkippedNodes) > 0 {
resultCtx["cdc_skipped_nodes"] = m.CDCSkippedNodes
}
if err := m.loadNodeOriginNames(); err != nil {
logger.Warn("mtree diff: unable to load node origin names; using raw node_origin values: %v", err)
}
Expand All @@ -2025,11 +2057,12 @@ func (m *MerkleTreeTask) DiffMtree() (err error) {
m.DiffResult = types.DiffOutput{
NodeDiffs: make(map[string]types.DiffByNodePair),
Summary: types.DiffSummary{
Schema: m.Schema,
Table: m.Table,
Nodes: m.NodeList,
StartTime: time.Now().Format(time.RFC3339),
DiffRowsCount: make(map[string]int),
Schema: m.Schema,
Table: m.Table,
Nodes: m.NodeList,
StartTime: time.Now().Format(time.RFC3339),
DiffRowsCount: make(map[string]int),
CDCSkippedNodes: m.CDCSkippedNodes,
},
}
m.diffRowKeySets = make(map[string]map[string]map[string]struct{})
Expand Down Expand Up @@ -2139,6 +2172,15 @@ func (m *MerkleTreeTask) DiffMtree() (err error) {

resultCtx["mismatched_pairs"] = len(m.DiffResult.NodeDiffs)

if len(m.CDCSkippedNodes) > 0 {
logger.Warn("diff completed in best-effort mode: the CDC drain was skipped for "+
"node(s) %v because the replication slot was held by another consumer "+
"(expected: 'mtree listen'). Recent changes on those node(s) may not be "+
"reflected, so divergence can be under-reported. Re-run with no concurrent "+
"'mtree listen' or other mtree operation holding the slot for a "+
"guaranteed-current diff.", m.CDCSkippedNodes)
}

return nil
}

Expand Down
35 changes: 35 additions & 0 deletions internal/infra/cdc/listen.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package cdc

import (
"context"
"errors"
"fmt"
"strings"
"sync"
Expand All @@ -29,6 +30,21 @@ import (
"github.com/pgedge/ace/pkg/logger"
)

// ErrSlotBusy signals that a bounded CDC drain could not run because the
// node's logical replication slot is already held by another active consumer
// (typically a running `mtree listen`). Bounded callers (UpdateMtree) treat
// this as a per-node skip rather than a fatal error; the continuous listener
// never returns it as terminal.
var ErrSlotBusy = errors.New("replication slot is held by another consumer")

// isSlotBusyErr reports whether err wraps a PostgreSQL "object in use" error
// (SQLSTATE 55006), which is what StartReplication returns when the slot is
// already active for another PID.
func isSlotBusyErr(err error) bool {
var pgErr *pgconn.PgError
return errors.As(err, &pgErr) && pgErr.Code == "55006"
}

// Since we're using the native pgoutput plugin, there are some wire protocol
// specifics that need to be used for parsing the messages. pglogrepl helps
// abstract away a lot of the low-level details, but wherever raw codes, flags, OIDs,
Expand Down Expand Up @@ -210,12 +226,31 @@ func processReplicationStream(ctx context.Context, nodeInfo map[string]any, cont

if err := pglogrepl.StartReplication(ctx, c, slotName, lastLSN, opts); err != nil {
c.Close(context.Background())
// If listen acquired the slot in the race window after the up-front
// check, surface it as ErrSlotBusy so a bounded caller skips rather
// than fails. (Continuous mode just retries via its reconnect loop.)
if isSlotBusyErr(err) {
return nil, fmt.Errorf("%w: %v", ErrSlotBusy, err)
}
return nil, fmt.Errorf("StartReplication failed: %w", err)
}
logger.Info("Logical replication started on slot %s from LSN %s", slotName, lastLSN)
return c, nil
}

// Best-effort cooperation with a running `mtree listen`: a bounded drain
// cannot attach to a slot that listen already holds (PostgreSQL allows one
// active consumer per slot). Detect that up front and return ErrSlotBusy so
// the caller can skip this node's drain instead of failing. Continuous mode
// is the legitimate slot owner and is exempt.
if !continuous {
if pid, perr := queries.GetActiveSlotPID(ctx, pool, slotName); perr != nil {
return fmt.Errorf("failed to check replication slot status: %w", perr)
} else if pid != nil {
return fmt.Errorf("%w (slot %s, pid %d)", ErrSlotBusy, slotName, *pid)
Comment thread
mason-sharp marked this conversation as resolved.
}
}

conn, err = connect()
if err != nil {
logger.Error("initial connection failed: %v", err)
Expand Down
61 changes: 61 additions & 0 deletions internal/infra/cdc/listen_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// ///////////////////////////////////////////////////////////////////////////
//
// # ACE - Active Consistency Engine
//
// Copyright (C) 2023 - 2026, pgEdge (https://www.pgedge.com/)
//
// This software is released under the PostgreSQL License:
// https://opensource.org/license/postgresql
//
// ///////////////////////////////////////////////////////////////////////////

package cdc

import (
"errors"
"fmt"
"testing"

"github.com/jackc/pgx/v5/pgconn"
)

func TestIsSlotBusyErr(t *testing.T) {
tests := []struct {
name string
err error
want bool
}{
{
name: "direct 55006 PgError",
err: &pgconn.PgError{Code: "55006", Message: "replication slot is active"},
want: true,
},
{
name: "wrapped 55006 PgError",
err: fmt.Errorf("StartReplication failed: %w", &pgconn.PgError{Code: "55006"}),
want: true,
},
{
name: "different SQLSTATE",
err: &pgconn.PgError{Code: "42704"},
want: false,
},
{
name: "plain error",
err: errors.New("some other failure"),
want: false,
},
{
name: "nil error",
err: nil,
want: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := isSlotBusyErr(tt.err); got != tt.want {
t.Errorf("isSlotBusyErr() = %v, want %v", got, tt.want)
}
})
}
}
6 changes: 6 additions & 0 deletions pkg/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,12 @@ type DiffSummary struct {
AgainstOriginResolved string `json:"against_origin_resolved,omitempty"`
Until string `json:"until,omitempty"`
OriginOnly bool `json:"origin_only,omitempty"`
// CDCSkippedNodes lists nodes whose pre-diff CDC drain was skipped because
// the replication slot was held by another consumer (typically a running
// mtree listen, but possibly a concurrent bounded mtree operation sharing
// the node's slot). The comparison for those nodes is best-effort and may
// omit their most recent changes, so divergence can be under-reported.
CDCSkippedNodes []string `json:"cdc_skipped_nodes,omitempty"`
}

type KVPair struct {
Expand Down
Loading
Loading