Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 1 addition & 120 deletions internal/consistency/diff/table_diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,7 @@ import (
"time"

"github.com/google/uuid"
"github.com/jackc/pgtype"
"github.com/jackc/pgx/v5"
pgxv5type "github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/pgedge/ace/db/queries"
auth "github.com/pgedge/ace/internal/infra/db"
Expand Down Expand Up @@ -631,124 +629,7 @@ func (t *TableDiffTask) fetchRows(nodeName string, r Range) ([]types.OrderedMap,

rowData := make(types.OrderedMap, len(colsDesc))
for i, colD := range colsDesc {
val := rowValues[i]
var processedVal any
if val == nil {
processedVal = nil
} else {
switch v := val.(type) {
case pgtype.Numeric:
if v.Status == pgtype.Present {
text, err := v.EncodeText(nil, nil)
if err == nil {
processedVal = utils.NormalizeNumericString(string(text))
} else {
processedVal = v
}
} else {
processedVal = nil
}
case pgtype.Timestamp:
if v.Status == pgtype.Present {
processedVal = v.Time
} else {
processedVal = nil
}
case pgtype.Timestamptz:
if v.Status == pgtype.Present {
processedVal = v.Time
} else {
processedVal = nil
}
case pgtype.Date:
if v.Status == pgtype.Present {
processedVal = v.Time
} else {
processedVal = nil
}
case pgtype.Bytea:
if v.Status == pgtype.Present {
processedVal = v.Bytes
} else {
processedVal = nil
}
case pgtype.Interval:
if v.Status == pgtype.Present {
if encoded, err := v.EncodeText(nil, nil); err == nil {
processedVal = string(encoded)
} else {
processedVal = nil
}
} else {
processedVal = nil
}
case pgtype.UUID:
if v.Status == pgtype.Present {
processedVal = fmt.Sprintf("%x-%x-%x-%x-%x",
v.Bytes[0:4], v.Bytes[4:6], v.Bytes[6:8], v.Bytes[8:10], v.Bytes[10:16])
} else {
processedVal = nil
}
case [16]byte: // pgx/v5 returns UUIDs as [16]byte
// nil caught above
processedVal = fmt.Sprintf("%x-%x-%x-%x-%x",
v[0:4], v[4:6], v[6:8], v[8:10], v[10:16])
case pgxv5type.Time: // pgx/v5 returns "time without time zone" as pgtype.Time
if v.Valid {
usec := v.Microseconds
hours := usec / 3_600_000_000
usec -= hours * 3_600_000_000
minutes := usec / 60_000_000
usec -= minutes * 60_000_000
seconds := usec / 1_000_000
usec -= seconds * 1_000_000
processedVal = fmt.Sprintf("%02d:%02d:%02d.%06d", hours, minutes, seconds, usec)
} else {
processedVal = nil
}
case time.Time:
processedVal = v
case string:
processedVal = v
case int8, int16, int32, int64, int,
uint8, uint16, uint32, uint64, uint,
float32, float64, bool:
processedVal = v
case pgtype.JSON, pgtype.JSONB:
if v.(interface{ GetStatus() pgtype.Status }).GetStatus() != pgtype.Present {
processedVal = nil
} else {
var dataHolder any
if assignable, ok := v.(interface{ AssignTo(dst any) error }); ok {
err := assignable.AssignTo(&dataHolder)
if err == nil {
if marshalled, mErr := json.Marshal(dataHolder); mErr == nil {
processedVal = string(marshalled)
} else {
processedVal = fmt.Sprint(dataHolder)
}
} else {
processedVal = nil
}
} else {
processedVal = nil
}
}
default:
if marshaler, ok := val.(json.Marshaler); ok {
if marshalled, err := marshaler.MarshalJSON(); err == nil {
processedVal = string(marshalled)
} else {
processedVal = fmt.Sprint(val)
}
} else if stringer, ok := val.(fmt.Stringer); ok {
processedVal = stringer.String()
} else {
processedVal = fmt.Sprint(val)
}
}
}
rowData[i] = types.KVPair{Key: string(colD.Name), Value: processedVal}
rowData[i] = types.KVPair{Key: string(colD.Name), Value: utils.NormalizeScannedValue(rowValues[i])}
}
results = append(results, rowData)
}
Expand Down
5 changes: 4 additions & 1 deletion internal/consistency/mtree/merkle.go
Original file line number Diff line number Diff line change
Expand Up @@ -865,7 +865,10 @@ func processRows(rows pgx.Rows) ([]types.OrderedMap, error) {
}
rowMap := make(types.OrderedMap, len(fields))
for i, field := range fields {
rowMap[i] = types.KVPair{Key: string(field.Name), Value: values[i]}
// Normalise driver structs (pgtype.Time, [16]byte UUIDs, ...) to
// the canonical diff-report representation; raw pgx values would
// serialise as JSON objects that table-repair cannot convert back.
rowMap[i] = types.KVPair{Key: string(field.Name), Value: utils.NormalizeScannedValue(values[i])}
}
results = append(results, rowMap)
}
Expand Down
164 changes: 164 additions & 0 deletions pkg/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,22 @@ func ConvertToPgxType(val any, pgType string) (any, error) {
// Fallback: let Postgres cast from the raw string
return s, nil
}
// Diff files written before scanned values were normalised carry the
// raw pgx struct form {"Microseconds":...,"Valid":...}; accept it so
// those files remain repairable.
if m, ok := val.(map[string]any); ok {
if valid, ok := m["Valid"].(bool); ok && !valid {
return nil, nil
}
switch usec := m["Microseconds"].(type) {
case json.Number:
if u, err := usec.Int64(); err == nil {
return pgxv5type.Time{Microseconds: u, Valid: true}, nil
}
case float64:
return pgxv5type.Time{Microseconds: int64(usec), Valid: true}, nil
}
}
return nil, fmt.Errorf("expected time string (HH:MM:SS) for %s, got %v (%T)", pgType, val, val)

case "timetz", "time with time zone":
Expand All @@ -739,6 +755,21 @@ func ConvertToPgxType(val any, pgType string) (any, error) {
}
return nil, fmt.Errorf("expected time string for %s, got %v (%T)", pgType, val, val)

case "money":
// Both diff engines emit money as its locale-formatted string (e.g.
// "$532.96"), which Postgres parses directly on input; pass it through
// without the unknown-type warning.
if s, ok := val.(string); ok {
return s, nil
}
if n, ok := val.(json.Number); ok {
return n.String(), nil
}
if f, ok := val.(float64); ok {
return fmt.Sprint(f), nil
}
return nil, fmt.Errorf("expected money string for %s, got %v (%T)", pgType, val, val)

case "interval":
if s, ok := val.(string); ok {
return s, nil
Expand Down Expand Up @@ -1131,6 +1162,139 @@ func rowsToMap(rows []types.OrderedMap, pkeyCols []string) (map[string]types.Ord
return rowMap, nil
}

// timeOrInfinity renders a pgtype v1 timestamp/date value, preserving the
// infinity modifier as its Postgres text form so repair can round-trip it
// instead of collapsing it to a zero time.
func timeOrInfinity(t time.Time, inf pgtype.InfinityModifier) any {
switch inf {
case pgtype.Infinity:
return "infinity"
case pgtype.NegativeInfinity:
return "-infinity"
default:
return t
}
}

// NormalizeScannedValue converts a value scanned from a database row into the
// canonical representation ACE stores in diff reports: pgx driver structs
// (pgtype.Time, [16]byte UUIDs, intervals, numerics, ...) become plain strings
// or time.Time values that JSON-encode losslessly and that ConvertToPgxType can
// turn back into typed parameters during repair. Both diff engines must route
// scanned rows through it so a diff file is repairable regardless of which
// engine produced it.
func NormalizeScannedValue(val any) any {
if val == nil {
return nil
}
switch v := val.(type) {
case pgtype.Numeric:
if v.Status == pgtype.Present {
text, err := v.EncodeText(nil, nil)
if err == nil {
return NormalizeNumericString(string(text))
}
return v
}
return nil
case pgtype.Timestamp:
if v.Status == pgtype.Present {
return timeOrInfinity(v.Time, v.InfinityModifier)
}
return nil
case pgtype.Timestamptz:
if v.Status == pgtype.Present {
return timeOrInfinity(v.Time, v.InfinityModifier)
}
return nil
case pgtype.Date:
if v.Status == pgtype.Present {
return timeOrInfinity(v.Time, v.InfinityModifier)
}
return nil
Comment thread
danolivo marked this conversation as resolved.
case pgtype.Bytea:
if v.Status == pgtype.Present {
return v.Bytes
}
return nil
Comment thread
danolivo marked this conversation as resolved.
case []byte: // pgx/v5 scans bytea as a raw byte slice
// Pass it through untouched: JSON encodes it as base64, which the
// repair-side bytea conversion decodes back. The generic fmt.Sprint
// fallback would render "[104 101 ...]" and corrupt the column on
// repair.
return v
case pgtype.Interval:
if v.Status == pgtype.Present {
if encoded, err := v.EncodeText(nil, nil); err == nil {
return string(encoded)
}
return nil
}
return nil
case pgxv5type.Interval: // pgx/v5 scans interval as pgtype.Interval
if !v.Valid {
return nil
}
// A fresh Map per call: pgtype.Map memoizes encode plans without
// locking, so a shared instance would race under the concurrent diff
// workers. Interval normalisation is rare enough not to matter.
if encoded, err := pgxv5type.NewMap().Encode(pgxv5type.IntervalOID, pgxv5type.TextFormatCode, v, nil); err == nil {
return string(encoded)
}
return fmt.Sprint(v)
case pgtype.UUID:
if v.Status == pgtype.Present {
return fmt.Sprintf("%x-%x-%x-%x-%x",
v.Bytes[0:4], v.Bytes[4:6], v.Bytes[6:8], v.Bytes[8:10], v.Bytes[10:16])
}
return nil
case [16]byte: // pgx/v5 returns UUIDs as [16]byte
return fmt.Sprintf("%x-%x-%x-%x-%x",
v[0:4], v[4:6], v[6:8], v[8:10], v[10:16])
case pgxv5type.Time: // pgx/v5 returns "time without time zone" as pgtype.Time
if v.Valid {
usec := v.Microseconds
hours := usec / 3_600_000_000
usec -= hours * 3_600_000_000
minutes := usec / 60_000_000
usec -= minutes * 60_000_000
seconds := usec / 1_000_000
usec -= seconds * 1_000_000
return fmt.Sprintf("%02d:%02d:%02d.%06d", hours, minutes, seconds, usec)
}
return nil
case time.Time:
return v
case string:
return v
case int8, int16, int32, int64, int,
uint8, uint16, uint32, uint64, uint,
float32, float64, bool:
return v
case pgtype.JSON:
if v.Status == pgtype.Present {
return string(v.Bytes)
}
return nil
case pgtype.JSONB:
if v.Status == pgtype.Present {
return string(v.Bytes)
}
return nil
default:
if marshaler, ok := val.(json.Marshaler); ok {
if marshalled, err := marshaler.MarshalJSON(); err == nil {
return string(marshalled)
}
return fmt.Sprint(val)
}
if stringer, ok := val.(fmt.Stringer); ok {
return stringer.String()
}
return fmt.Sprint(val)
}
}

// NormalizeNumericString strips trailing fractional zeros from a numeric string,
// acting as the Go-side equivalent of PostgreSQL's trim_scale().
// Examples: "3000.00" → "3000", "3000.10" → "3000.1", "3000" → "3000"
Expand Down
Loading
Loading