From b786cbcd169a7a05a722e30c487bcbc867d51a8e Mon Sep 17 00:00:00 2001 From: "Andrei V. Lepikhov" Date: Fri, 26 Jun 2026 13:42:04 +0200 Subject: [PATCH] Register ACE schema with Spock skip_schema on MtreeInit Append the configured mtree.schema to spock.node.info->'skip_schema' right after creating it on each node. The UPDATE is additive and idempotent, so operator-set entries are preserved and re-runs are noops. Best-effort: failures log a warning instead of aborting init. Co-Authored-By: Claude Opus 4.7 (1M context) --- db/queries/queries.go | 19 +++++++++++++++++++ db/queries/templates.go | 18 ++++++++++++++++++ internal/consistency/mtree/merkle.go | 13 +++++++++++++ 3 files changed, 50 insertions(+) diff --git a/db/queries/queries.go b/db/queries/queries.go index 8f9bd71..0c4a2f8 100644 --- a/db/queries/queries.go +++ b/db/queries/queries.go @@ -3006,6 +3006,25 @@ func CreateSchema(ctx context.Context, db DBQuerier, schemaName string) error { return nil } +// RegisterSkipSchema asks the local Spock node to exclude schemaName from +// replication by appending it to spock.node.info->'skip_schema'. The call is +// idempotent: a schema already in the array is left untouched. Returns the +// number of rows updated (0 when the schema is already registered or the +// local node row is missing). +func RegisterSkipSchema(ctx context.Context, db DBQuerier, schemaName string) (int64, error) { + sql, err := RenderSQL(SQLTemplates.RegisterSkipSchema, nil) + if err != nil { + return 0, fmt.Errorf("failed to render RegisterSkipSchema SQL: %w", err) + } + + tag, err := db.Exec(ctx, sql, schemaName) + if err != nil { + return 0, fmt.Errorf("query to register skip_schema failed: %w", err) + } + + return tag.RowsAffected(), nil +} + func ResetPositionsByStartFromTemp(ctx context.Context, db DBQuerier, mtreeTable string, offset int64) error { data := map[string]any{ "MtreeTable": mtreeTable, diff --git a/db/queries/templates.go b/db/queries/templates.go index c4b54fa..199478d 100644 --- a/db/queries/templates.go +++ b/db/queries/templates.go @@ -42,6 +42,7 @@ type Templates struct { CheckRepSetExists *template.Template GetTablesInRepSet *template.Template GetPkeyColumnTypes *template.Template + RegisterSkipSchema *template.Template CreateMetadataTable *template.Template GetPkeyOffsets *template.Template @@ -686,6 +687,23 @@ var SQLTemplates = Templates{ GetTablesInRepSet: template.Must(template.New("getTablesInRepSet").Parse( `SELECT concat_ws('.', nspname, relname) FROM spock.tables where set_name = $1;`, )), + // RegisterSkipSchema appends $1 to spock.node.info->'skip_schema' for + // the local node, leaving any pre-existing entries intact. The NOT (... ? $1) + // guard makes the statement idempotent: running it twice with the same + // schema name updates zero rows the second time. spock.local_node is a + // view, so the UPDATE goes through spock.node and joins on node_id. + RegisterSkipSchema: template.Must(template.New("registerSkipSchema").Parse(` + UPDATE spock.node n + SET info = jsonb_set( + coalesce(n.info, '{}'::jsonb), + '{skip_schema}', + coalesce(n.info->'skip_schema', '[]'::jsonb) || to_jsonb($1::text), + true + ) + FROM spock.local_node l + WHERE n.node_id = l.node_id + AND NOT (coalesce(n.info->'skip_schema', '[]'::jsonb) ? $1); + `)), GetPkeyColumnTypes: template.Must(template.New("getPkeyColumnTypes").Parse(` SELECT a.attname, diff --git a/internal/consistency/mtree/merkle.go b/internal/consistency/mtree/merkle.go index 4f485d0..558f9d1 100644 --- a/internal/consistency/mtree/merkle.go +++ b/internal/consistency/mtree/merkle.go @@ -841,6 +841,19 @@ func (m *MerkleTreeTask) initOneNode(nodeInfo map[string]any, publicationName, s return fmt.Errorf("failed to create schema '%s' on node %s: %w", m.aceSchema(), nodeInfo["Name"], err) } + // Ask Spock to exclude the ACE schema from replication on this node. + // ACE metadata (Merkle trees, CDC bookkeeping) is node-local by design; + // replicating it would corrupt peers and is what skip_schema is for. + // Best-effort: a missing or older Spock should not block MtreeInit, so + // we log and continue instead of returning. + if rows, err := queries.RegisterSkipSchema(m.Ctx, pool, m.aceSchema()); err != nil { + logger.Warn("failed to register skip_schema '%s' with Spock on node %s: %v", + m.aceSchema(), nodeInfo["Name"], err) + } else if rows > 0 { + logger.Info("registered skip_schema '%s' with Spock on node: %s", + m.aceSchema(), nodeInfo["Name"]) + } + var pubCommitLSN string if err := func() (err error) { tx, err := pool.Begin(m.Ctx)