Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions db/queries/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -3006,6 +3006,25 @@ func CreateSchema(ctx context.Context, db DBQuerier, schemaName string) error {
return nil
}

// RegisterSkipSchema asks the local Spock node to exclude schemaName from
// replication by appending it to spock.node.info->'skip_schema'. The call is
// idempotent: a schema already in the array is left untouched. Returns the
// number of rows updated (0 when the schema is already registered or the
// local node row is missing).
func RegisterSkipSchema(ctx context.Context, db DBQuerier, schemaName string) (int64, error) {
sql, err := RenderSQL(SQLTemplates.RegisterSkipSchema, nil)
if err != nil {
return 0, fmt.Errorf("failed to render RegisterSkipSchema SQL: %w", err)
}

tag, err := db.Exec(ctx, sql, schemaName)
if err != nil {
return 0, fmt.Errorf("query to register skip_schema failed: %w", err)
}

return tag.RowsAffected(), nil
}

func ResetPositionsByStartFromTemp(ctx context.Context, db DBQuerier, mtreeTable string, offset int64) error {
data := map[string]any{
"MtreeTable": mtreeTable,
Expand Down
18 changes: 18 additions & 0 deletions db/queries/templates.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type Templates struct {
CheckRepSetExists *template.Template
GetTablesInRepSet *template.Template
GetPkeyColumnTypes *template.Template
RegisterSkipSchema *template.Template

CreateMetadataTable *template.Template
GetPkeyOffsets *template.Template
Expand Down Expand Up @@ -686,6 +687,23 @@ var SQLTemplates = Templates{
GetTablesInRepSet: template.Must(template.New("getTablesInRepSet").Parse(
`SELECT concat_ws('.', nspname, relname) FROM spock.tables where set_name = $1;`,
)),
// RegisterSkipSchema appends $1 to spock.node.info->'skip_schema' for
// the local node, leaving any pre-existing entries intact. The NOT (... ? $1)
// guard makes the statement idempotent: running it twice with the same
// schema name updates zero rows the second time. spock.local_node is a
// view, so the UPDATE goes through spock.node and joins on node_id.
RegisterSkipSchema: template.Must(template.New("registerSkipSchema").Parse(`
UPDATE spock.node n
SET info = jsonb_set(
coalesce(n.info, '{}'::jsonb),
'{skip_schema}',
coalesce(n.info->'skip_schema', '[]'::jsonb) || to_jsonb($1::text),
true
)
FROM spock.local_node l
WHERE n.node_id = l.node_id
AND NOT (coalesce(n.info->'skip_schema', '[]'::jsonb) ? $1);
`)),
Comment thread
coderabbitai[bot] marked this conversation as resolved.
GetPkeyColumnTypes: template.Must(template.New("getPkeyColumnTypes").Parse(`
SELECT
a.attname,
Expand Down
13 changes: 13 additions & 0 deletions internal/consistency/mtree/merkle.go
Original file line number Diff line number Diff line change
Expand Up @@ -841,6 +841,19 @@ func (m *MerkleTreeTask) initOneNode(nodeInfo map[string]any, publicationName, s
return fmt.Errorf("failed to create schema '%s' on node %s: %w", m.aceSchema(), nodeInfo["Name"], err)
}

// Ask Spock to exclude the ACE schema from replication on this node.
// ACE metadata (Merkle trees, CDC bookkeeping) is node-local by design;
// replicating it would corrupt peers and is what skip_schema is for.
// Best-effort: a missing or older Spock should not block MtreeInit, so
// we log and continue instead of returning.
if rows, err := queries.RegisterSkipSchema(m.Ctx, pool, m.aceSchema()); err != nil {
logger.Warn("failed to register skip_schema '%s' with Spock on node %s: %v",
m.aceSchema(), nodeInfo["Name"], err)
} else if rows > 0 {
logger.Info("registered skip_schema '%s' with Spock on node: %s",
m.aceSchema(), nodeInfo["Name"])
}

var pubCommitLSN string
if err := func() (err error) {
tx, err := pool.Begin(m.Ctx)
Expand Down
Loading