diff --git a/internal/consistency/diff/table_diff.go b/internal/consistency/diff/table_diff.go index 3803fd5..c88e2c8 100644 --- a/internal/consistency/diff/table_diff.go +++ b/internal/consistency/diff/table_diff.go @@ -31,9 +31,7 @@ import ( "time" "github.com/google/uuid" - "github.com/jackc/pgtype" "github.com/jackc/pgx/v5" - pgxv5type "github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgxpool" "github.com/pgedge/ace/db/queries" auth "github.com/pgedge/ace/internal/infra/db" @@ -631,124 +629,7 @@ func (t *TableDiffTask) fetchRows(nodeName string, r Range) ([]types.OrderedMap, rowData := make(types.OrderedMap, len(colsDesc)) for i, colD := range colsDesc { - val := rowValues[i] - var processedVal any - if val == nil { - processedVal = nil - } else { - switch v := val.(type) { - case pgtype.Numeric: - if v.Status == pgtype.Present { - text, err := v.EncodeText(nil, nil) - if err == nil { - processedVal = utils.NormalizeNumericString(string(text)) - } else { - processedVal = v - } - } else { - processedVal = nil - } - case pgtype.Timestamp: - if v.Status == pgtype.Present { - processedVal = v.Time - } else { - processedVal = nil - } - case pgtype.Timestamptz: - if v.Status == pgtype.Present { - processedVal = v.Time - } else { - processedVal = nil - } - case pgtype.Date: - if v.Status == pgtype.Present { - processedVal = v.Time - } else { - processedVal = nil - } - case pgtype.Bytea: - if v.Status == pgtype.Present { - processedVal = v.Bytes - } else { - processedVal = nil - } - case pgtype.Interval: - if v.Status == pgtype.Present { - if encoded, err := v.EncodeText(nil, nil); err == nil { - processedVal = string(encoded) - } else { - processedVal = nil - } - } else { - processedVal = nil - } - case pgtype.UUID: - if v.Status == pgtype.Present { - processedVal = fmt.Sprintf("%x-%x-%x-%x-%x", - v.Bytes[0:4], v.Bytes[4:6], v.Bytes[6:8], v.Bytes[8:10], v.Bytes[10:16]) - } else { - processedVal = nil - } - case [16]byte: // pgx/v5 returns UUIDs as [16]byte - // nil caught above - processedVal = fmt.Sprintf("%x-%x-%x-%x-%x", - v[0:4], v[4:6], v[6:8], v[8:10], v[10:16]) - case pgxv5type.Time: // pgx/v5 returns "time without time zone" as pgtype.Time - if v.Valid { - usec := v.Microseconds - hours := usec / 3_600_000_000 - usec -= hours * 3_600_000_000 - minutes := usec / 60_000_000 - usec -= minutes * 60_000_000 - seconds := usec / 1_000_000 - usec -= seconds * 1_000_000 - processedVal = fmt.Sprintf("%02d:%02d:%02d.%06d", hours, minutes, seconds, usec) - } else { - processedVal = nil - } - case time.Time: - processedVal = v - case string: - processedVal = v - case int8, int16, int32, int64, int, - uint8, uint16, uint32, uint64, uint, - float32, float64, bool: - processedVal = v - case pgtype.JSON, pgtype.JSONB: - if v.(interface{ GetStatus() pgtype.Status }).GetStatus() != pgtype.Present { - processedVal = nil - } else { - var dataHolder any - if assignable, ok := v.(interface{ AssignTo(dst any) error }); ok { - err := assignable.AssignTo(&dataHolder) - if err == nil { - if marshalled, mErr := json.Marshal(dataHolder); mErr == nil { - processedVal = string(marshalled) - } else { - processedVal = fmt.Sprint(dataHolder) - } - } else { - processedVal = nil - } - } else { - processedVal = nil - } - } - default: - if marshaler, ok := val.(json.Marshaler); ok { - if marshalled, err := marshaler.MarshalJSON(); err == nil { - processedVal = string(marshalled) - } else { - processedVal = fmt.Sprint(val) - } - } else if stringer, ok := val.(fmt.Stringer); ok { - processedVal = stringer.String() - } else { - processedVal = fmt.Sprint(val) - } - } - } - rowData[i] = types.KVPair{Key: string(colD.Name), Value: processedVal} + rowData[i] = types.KVPair{Key: string(colD.Name), Value: utils.NormalizeScannedValue(rowValues[i])} } results = append(results, rowData) } diff --git a/internal/consistency/mtree/merkle.go b/internal/consistency/mtree/merkle.go index ee1fc9e..c5149b3 100644 --- a/internal/consistency/mtree/merkle.go +++ b/internal/consistency/mtree/merkle.go @@ -865,7 +865,10 @@ func processRows(rows pgx.Rows) ([]types.OrderedMap, error) { } rowMap := make(types.OrderedMap, len(fields)) for i, field := range fields { - rowMap[i] = types.KVPair{Key: string(field.Name), Value: values[i]} + // Normalise driver structs (pgtype.Time, [16]byte UUIDs, ...) to + // the canonical diff-report representation; raw pgx values would + // serialise as JSON objects that table-repair cannot convert back. + rowMap[i] = types.KVPair{Key: string(field.Name), Value: utils.NormalizeScannedValue(values[i])} } results = append(results, rowMap) } diff --git a/pkg/common/utils.go b/pkg/common/utils.go index 2f717bf..5c53c32 100644 --- a/pkg/common/utils.go +++ b/pkg/common/utils.go @@ -729,6 +729,22 @@ func ConvertToPgxType(val any, pgType string) (any, error) { // Fallback: let Postgres cast from the raw string return s, nil } + // Diff files written before scanned values were normalised carry the + // raw pgx struct form {"Microseconds":...,"Valid":...}; accept it so + // those files remain repairable. + if m, ok := val.(map[string]any); ok { + if valid, ok := m["Valid"].(bool); ok && !valid { + return nil, nil + } + switch usec := m["Microseconds"].(type) { + case json.Number: + if u, err := usec.Int64(); err == nil { + return pgxv5type.Time{Microseconds: u, Valid: true}, nil + } + case float64: + return pgxv5type.Time{Microseconds: int64(usec), Valid: true}, nil + } + } return nil, fmt.Errorf("expected time string (HH:MM:SS) for %s, got %v (%T)", pgType, val, val) case "timetz", "time with time zone": @@ -739,6 +755,21 @@ func ConvertToPgxType(val any, pgType string) (any, error) { } return nil, fmt.Errorf("expected time string for %s, got %v (%T)", pgType, val, val) + case "money": + // Both diff engines emit money as its locale-formatted string (e.g. + // "$532.96"), which Postgres parses directly on input; pass it through + // without the unknown-type warning. + if s, ok := val.(string); ok { + return s, nil + } + if n, ok := val.(json.Number); ok { + return n.String(), nil + } + if f, ok := val.(float64); ok { + return fmt.Sprint(f), nil + } + return nil, fmt.Errorf("expected money string for %s, got %v (%T)", pgType, val, val) + case "interval": if s, ok := val.(string); ok { return s, nil @@ -1131,6 +1162,139 @@ func rowsToMap(rows []types.OrderedMap, pkeyCols []string) (map[string]types.Ord return rowMap, nil } +// timeOrInfinity renders a pgtype v1 timestamp/date value, preserving the +// infinity modifier as its Postgres text form so repair can round-trip it +// instead of collapsing it to a zero time. +func timeOrInfinity(t time.Time, inf pgtype.InfinityModifier) any { + switch inf { + case pgtype.Infinity: + return "infinity" + case pgtype.NegativeInfinity: + return "-infinity" + default: + return t + } +} + +// NormalizeScannedValue converts a value scanned from a database row into the +// canonical representation ACE stores in diff reports: pgx driver structs +// (pgtype.Time, [16]byte UUIDs, intervals, numerics, ...) become plain strings +// or time.Time values that JSON-encode losslessly and that ConvertToPgxType can +// turn back into typed parameters during repair. Both diff engines must route +// scanned rows through it so a diff file is repairable regardless of which +// engine produced it. +func NormalizeScannedValue(val any) any { + if val == nil { + return nil + } + switch v := val.(type) { + case pgtype.Numeric: + if v.Status == pgtype.Present { + text, err := v.EncodeText(nil, nil) + if err == nil { + return NormalizeNumericString(string(text)) + } + return v + } + return nil + case pgtype.Timestamp: + if v.Status == pgtype.Present { + return timeOrInfinity(v.Time, v.InfinityModifier) + } + return nil + case pgtype.Timestamptz: + if v.Status == pgtype.Present { + return timeOrInfinity(v.Time, v.InfinityModifier) + } + return nil + case pgtype.Date: + if v.Status == pgtype.Present { + return timeOrInfinity(v.Time, v.InfinityModifier) + } + return nil + case pgtype.Bytea: + if v.Status == pgtype.Present { + return v.Bytes + } + return nil + case []byte: // pgx/v5 scans bytea as a raw byte slice + // Pass it through untouched: JSON encodes it as base64, which the + // repair-side bytea conversion decodes back. The generic fmt.Sprint + // fallback would render "[104 101 ...]" and corrupt the column on + // repair. + return v + case pgtype.Interval: + if v.Status == pgtype.Present { + if encoded, err := v.EncodeText(nil, nil); err == nil { + return string(encoded) + } + return nil + } + return nil + case pgxv5type.Interval: // pgx/v5 scans interval as pgtype.Interval + if !v.Valid { + return nil + } + // A fresh Map per call: pgtype.Map memoizes encode plans without + // locking, so a shared instance would race under the concurrent diff + // workers. Interval normalisation is rare enough not to matter. + if encoded, err := pgxv5type.NewMap().Encode(pgxv5type.IntervalOID, pgxv5type.TextFormatCode, v, nil); err == nil { + return string(encoded) + } + return fmt.Sprint(v) + case pgtype.UUID: + if v.Status == pgtype.Present { + return fmt.Sprintf("%x-%x-%x-%x-%x", + v.Bytes[0:4], v.Bytes[4:6], v.Bytes[6:8], v.Bytes[8:10], v.Bytes[10:16]) + } + return nil + case [16]byte: // pgx/v5 returns UUIDs as [16]byte + return fmt.Sprintf("%x-%x-%x-%x-%x", + v[0:4], v[4:6], v[6:8], v[8:10], v[10:16]) + case pgxv5type.Time: // pgx/v5 returns "time without time zone" as pgtype.Time + if v.Valid { + usec := v.Microseconds + hours := usec / 3_600_000_000 + usec -= hours * 3_600_000_000 + minutes := usec / 60_000_000 + usec -= minutes * 60_000_000 + seconds := usec / 1_000_000 + usec -= seconds * 1_000_000 + return fmt.Sprintf("%02d:%02d:%02d.%06d", hours, minutes, seconds, usec) + } + return nil + case time.Time: + return v + case string: + return v + case int8, int16, int32, int64, int, + uint8, uint16, uint32, uint64, uint, + float32, float64, bool: + return v + case pgtype.JSON: + if v.Status == pgtype.Present { + return string(v.Bytes) + } + return nil + case pgtype.JSONB: + if v.Status == pgtype.Present { + return string(v.Bytes) + } + return nil + default: + if marshaler, ok := val.(json.Marshaler); ok { + if marshalled, err := marshaler.MarshalJSON(); err == nil { + return string(marshalled) + } + return fmt.Sprint(val) + } + if stringer, ok := val.(fmt.Stringer); ok { + return stringer.String() + } + return fmt.Sprint(val) + } +} + // NormalizeNumericString strips trailing fractional zeros from a numeric string, // acting as the Go-side equivalent of PostgreSQL's trim_scale(). // Examples: "3000.00" → "3000", "3000.10" → "3000.1", "3000" → "3000" diff --git a/pkg/common/utils_test.go b/pkg/common/utils_test.go index 8704599..4f8ca14 100644 --- a/pkg/common/utils_test.go +++ b/pkg/common/utils_test.go @@ -4,9 +4,11 @@ import ( "encoding/base64" "encoding/json" "fmt" + "sync" "testing" "time" + "github.com/jackc/pgtype" pgxv5type "github.com/jackc/pgx/v5/pgtype" "github.com/pgedge/ace/pkg/types" "github.com/stretchr/testify/require" @@ -61,6 +63,113 @@ func TestConvertToPgxType_FallbackStringer(t *testing.T) { require.Equal(t, "stringer:x", val) } +// A time value round-trips through the diff report as HH:MM:SS, so repair can +// convert it back into a typed parameter. +func TestNormalizeScannedValue_Time(t *testing.T) { + // 16:11:49 = 58309 s since midnight + val := NormalizeScannedValue(pgxv5type.Time{Microseconds: 58_309_000_000, Valid: true}) + require.Equal(t, "16:11:49.000000", val) + require.Nil(t, NormalizeScannedValue(pgxv5type.Time{Valid: false})) + + converted, err := ConvertToPgxType(val, "time without time zone") + require.NoError(t, err) + require.Equal(t, pgxv5type.Time{Microseconds: 58_309_000_000, Valid: true}, converted) +} + +// A pgx v5 UUID ([16]byte) becomes the canonical string form, not a JSON array. +func TestNormalizeScannedValue_UUID(t *testing.T) { + raw := [16]byte{0x12, 0x34, 0x56, 0x78, 0x9a, 0xbc, 0xde, 0xf0, 0x12, 0x34, 0x56, 0x78, 0x9a, 0xbc, 0xde, 0xf0} + require.Equal(t, "12345678-9abc-def0-1234-56789abcdef0", NormalizeScannedValue(raw)) +} + +// Plain values and nil pass through unchanged. +func TestNormalizeScannedValue_Passthrough(t *testing.T) { + require.Nil(t, NormalizeScannedValue(nil)) + require.Equal(t, "abc", NormalizeScannedValue("abc")) + require.Equal(t, int64(42), NormalizeScannedValue(int64(42))) + ts := time.Date(2026, 7, 1, 10, 0, 0, 0, time.UTC) + require.Equal(t, ts, NormalizeScannedValue(ts)) +} + +// A pgx v5 interval struct becomes its Postgres text form. +func TestNormalizeScannedValue_Interval(t *testing.T) { + val := NormalizeScannedValue(pgxv5type.Interval{Days: 1, Microseconds: 7_384_000_000, Valid: true}) + s, ok := val.(string) + require.True(t, ok, "interval should normalise to a string, got %T", val) + require.Contains(t, s, "1 day") + require.Nil(t, NormalizeScannedValue(pgxv5type.Interval{Valid: false})) +} + +// Diff files written before values were normalised carry the raw pgx struct +// form for time; repair still accepts it. +func TestConvertToPgxType_TimeLegacyMapForm(t *testing.T) { + val, err := ConvertToPgxType(map[string]any{"Microseconds": float64(9_316_000_000), "Valid": true}, "time without time zone") + require.NoError(t, err) + require.Equal(t, pgxv5type.Time{Microseconds: 9_316_000_000, Valid: true}, val) + + val, err = ConvertToPgxType(map[string]any{"Microseconds": json.Number("9316000000"), "Valid": true}, "time") + require.NoError(t, err) + require.Equal(t, pgxv5type.Time{Microseconds: 9_316_000_000, Valid: true}, val) + + val, err = ConvertToPgxType(map[string]any{"Microseconds": float64(0), "Valid": false}, "time") + require.NoError(t, err) + require.Nil(t, val) +} + +// Money strings pass through as-is for Postgres to parse, without warnings. +func TestConvertToPgxType_Money(t *testing.T) { + val, err := ConvertToPgxType("$532.96", "money") + require.NoError(t, err) + require.Equal(t, "$532.96", val) +} + +// Concurrent normalisation is race-free (a pgtype.Map must not be shared +// across goroutines; run with -race). +func TestNormalizeScannedValue_ConcurrentInterval(t *testing.T) { + var wg sync.WaitGroup + for i := 0; i < 8; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for j := 0; j < 100; j++ { + _ = NormalizeScannedValue(pgxv5type.Interval{Days: 2, Microseconds: 3_600_000_000, Valid: true}) + } + }() + } + wg.Wait() +} + +// Infinite timestamps and dates survive as their Postgres text form instead of +// collapsing to a zero time. +func TestNormalizeScannedValue_Infinity(t *testing.T) { + require.Equal(t, "infinity", NormalizeScannedValue(pgtype.Timestamp{Status: pgtype.Present, InfinityModifier: pgtype.Infinity})) + require.Equal(t, "-infinity", NormalizeScannedValue(pgtype.Timestamptz{Status: pgtype.Present, InfinityModifier: pgtype.NegativeInfinity})) + require.Equal(t, "-infinity", NormalizeScannedValue(pgtype.Date{Status: pgtype.Present, InfinityModifier: pgtype.NegativeInfinity})) +} + +// Raw bytea payloads pass through untouched (JSON base64) instead of being +// stringified into an unrepairable "[104 101 ...]" form. +func TestNormalizeScannedValue_ByteaPassthrough(t *testing.T) { + b := []byte{0x00, 0xff, 0x10, 0x7f} + val := NormalizeScannedValue(b) + require.Equal(t, b, val) + + // The full report round-trip: JSON-encode, decode, convert for repair. + encoded, err := json.Marshal(val) + require.NoError(t, err) + var decoded string + require.NoError(t, json.Unmarshal(encoded, &decoded)) + converted, err := ConvertToPgxType(decoded, "bytea") + require.NoError(t, err) + require.Equal(t, b, converted) +} + +// pgtype v1 JSON values normalise to their raw text without panicking. +func TestNormalizeScannedValue_JSONv1(t *testing.T) { + require.Equal(t, `{"a":1}`, NormalizeScannedValue(pgtype.JSON{Bytes: []byte(`{"a":1}`), Status: pgtype.Present})) + require.Nil(t, NormalizeScannedValue(pgtype.JSONB{Status: pgtype.Null})) +} + func TestNormalizeNumericString(t *testing.T) { tests := []struct { input string diff --git a/tests/integration/mtree_typed_repair_test.go b/tests/integration/mtree_typed_repair_test.go new file mode 100644 index 0000000..970ebfa --- /dev/null +++ b/tests/integration/mtree_typed_repair_test.go @@ -0,0 +1,131 @@ +// /////////////////////////////////////////////////////////////////////////// +// +// # ACE - Active Consistency Engine +// +// Copyright (C) 2023 - 2026, pgEdge (https://www.pgedge.com/) +// +// This software is released under the PostgreSQL License: +// https://opensource.org/license/postgresql +// +// /////////////////////////////////////////////////////////////////////////// + +package integration + +import ( + "context" + "fmt" + "path/filepath" + "slices" + "testing" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/stretchr/testify/require" +) + +// seedTypedRepairTable creates the typed-columns table on both nodes and seeds +// rows on n1 only -- the diverged state a disabled subscription leaves behind. +func seedTypedRepairTable(t *testing.T, ctx context.Context, env *testEnv, safe string) { + t.Helper() + pools := []*pgxpool.Pool{env.N1Pool, env.N2Pool} + for _, pool := range pools { + _, err := pool.Exec(ctx, "CREATE TABLE IF NOT EXISTS "+safe+ // nosemgrep + " (id BIGINT PRIMARY KEY, name VARCHAR(100), col_time TIME, col_timetz TIMETZ,"+ + " col_money MONEY, col_uuid UUID, col_interval INTERVAL, col_num NUMERIC(12,4))") + require.NoError(t, err) + } + t.Cleanup(func() { + for _, pool := range pools { + _, _ = pool.Exec(ctx, "DROP TABLE IF EXISTS "+safe+" CASCADE") // nosemgrep + } + }) + + seedSQL := "INSERT INTO " + safe + ` (id, name, col_time, col_timetz, col_money, col_uuid, col_interval, col_num) + SELECT i, + 'name_' || i, + TIME '00:00:00' + (i * 137 || ' seconds')::interval, + TIMETZ '00:00:00+02' + (i * 91 || ' seconds')::interval, + (i * 13.37)::numeric::money, + ('00000000-0000-0000-0000-' || lpad(i::text, 12, '0'))::uuid, + (i || ' hours 30 minutes')::interval, + i * 1.5 + FROM generate_series(1, 100) AS i` + _, err := env.N1Pool.Exec(ctx, seedSQL) // nosemgrep + require.NoError(t, err) + for _, pool := range pools { + _, err := pool.Exec(ctx, "ANALYZE "+safe) // nosemgrep + require.NoError(t, err) + } +} + +// diffReportsFor snapshots the diff report files currently on disk for a table. +func diffReportsFor(t *testing.T, tableName string) map[string]struct{} { + t.Helper() + matches, err := filepath.Glob(fmt.Sprintf("%s_%s_diffs-*.json", testSchema, tableName)) + require.NoError(t, err) + seen := make(map[string]struct{}, len(matches)) + for _, m := range matches { + seen[m] = struct{}{} + } + return seen +} + +// newDiffReportSince returns the newest diff report written after the snapshot, +// so a repair never runs from a report left over by an earlier test. +func newDiffReportSince(t *testing.T, tableName string, seen map[string]struct{}) string { + t.Helper() + matches, err := filepath.Glob(fmt.Sprintf("%s_%s_diffs-*.json", testSchema, tableName)) + require.NoError(t, err) + matches = slices.DeleteFunc(matches, func(m string) bool { + _, ok := seen[m] + return ok + }) + require.NotEmpty(t, matches, "mtree diff should have written a diff report") + slices.Sort(matches) + return matches[len(matches)-1] +} + +// typedRepairFingerprint hashes a table's full ordered contents on one node. +func typedRepairFingerprint(t *testing.T, ctx context.Context, pool *pgxpool.Pool, safe string) (fp string) { + t.Helper() + require.NoError(t, pool.QueryRow(ctx, "SELECT COALESCE(md5(string_agg(tr::text, ',' ORDER BY id)), 'empty') FROM "+safe+" tr").Scan(&fp)) // nosemgrep + return +} + +// A diff produced by the Merkle-tree engine repairs cleanly on a table with +// time, timetz, money, uuid and interval columns, and the nodes converge. +func TestMtreeDiffRepairTypedColumns(t *testing.T) { + ctx := context.Background() + env := newSpockEnv() + + tableName := "typed_repair" + qualified := fmt.Sprintf("%s.%s", testSchema, tableName) + safe := pgx.Identifier{testSchema, tableName}.Sanitize() + seedTypedRepairTable(t, ctx, env, safe) + + mtreeTask := env.newMerkleTreeTask(t, qualified, []string{env.ServiceN1, env.ServiceN2}) + mtreeTask.BlockSize = 100 + mtreeTask.OverrideBlockSize = true + require.NoError(t, mtreeTask.RunChecks(false)) + require.NoError(t, mtreeTask.MtreeInit()) + t.Cleanup(func() { _ = mtreeTask.MtreeTeardown() }) + require.NoError(t, mtreeTask.BuildMtree()) + + // mtree table-diff must find the 100 n1-only rows and write a new report. + before := diffReportsFor(t, tableName) + require.NoError(t, mtreeTask.DiffMtree()) + total := 0 + for _, c := range mtreeTask.DiffResult.Summary.DiffRowsCount { + total += c + } + require.Equal(t, 100, total, "mtree diff should report the 100 divergent rows") + diffFile := newDiffReportSince(t, tableName, before) + + // The repair must complete -- previously the time column's serialized pgx + // struct aborted every upsert -- and converge the nodes. + repairTask := env.newTableRepairTask(env.ServiceN1, qualified, diffFile) + require.NoError(t, repairTask.Run(false), "table-repair from an mtree diff must succeed on typed columns") + + require.Equal(t, typedRepairFingerprint(t, ctx, env.N1Pool, safe), typedRepairFingerprint(t, ctx, env.N2Pool, safe), + "nodes must hold identical rows (all typed columns included) after repair") +}