Skip to content

Commit 86bced2

Browse files
authored
Merge pull request #426 from Shopify/feature/meaningful-progress-report
Richer progress report
2 parents c49b3e5 + 5768a53 commit 86bced2

14 files changed

Lines changed: 217 additions & 137 deletions

CHANGELOG.md

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,33 @@
22

33
All notable changes to this project will be documented in this file.
44

5-
## [Unreleased]
5+
## [1.2.1 - ?]
6+
7+
### Added
8+
9+
- `PaginationKey` now includes column name
10+
11+
### Changed
12+
13+
- Use `PaginationKey` instead of raw `uint64` for progress report. This means that table progress report will
14+
include not raw value, but a whole `PaginationKey` object, i.e.
15+
```json
16+
{
17+
"type": "uint64",
18+
"column": "id",
19+
"value": 999
20+
}
21+
```
22+
which will be in line with the format of state dump. @driv3r #426
23+
24+
## [1.2.0 - 2026-02-06]
625

726
### Added
827

928
- Changelog.
10-
- Pagination keys beyond UINT64 @milanatshopify #417
11-
- Pagination keys other than UINT64 have to have binary collation @grodowski #422
29+
- UUID as ID: validate collation by @grodowski in #422
30+
- NewPaginationKeyFromRow refactor by @grodowski in #424
31+
- Pagination beyond uint64 by @milanatshopify in #417
1232

1333
## [1.1.0]
1434

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# Variables to be built into the binary
2-
VERSION := 1.2.0
2+
VERSION := 1.2.1
33

44
# This variable can be overwritten by the caller
55
DATETIME ?= $(shell date -u +%Y%m%d%H%M%S)

control_server.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ type ControlServerTableStatus struct {
2020
TableName string
2121
PaginationKeyName string
2222
Status string
23-
LastSuccessfulPaginationKey uint64
24-
TargetPaginationKey uint64
23+
LastSuccessfulPaginationKey PaginationKey
24+
TargetPaginationKey PaginationKey
2525

2626
BatchSize uint64
2727
}
@@ -286,11 +286,11 @@ func (this *ControlServer) fetchStatus() *ControlServerStatus {
286286

287287
lastSuccessfulPaginationKey := tableProgress.LastSuccessfulPaginationKey
288288
if tableProgress.CurrentAction == TableActionWaiting {
289-
lastSuccessfulPaginationKey = 0
289+
lastSuccessfulPaginationKey = MinPaginationKey(this.F.Tables[name].GetPaginationColumn())
290290
}
291291
controlStatus := &ControlServerTableStatus{
292292
TableName: name,
293-
PaginationKeyName: this.F.Tables[name].GetPaginationColumn().Name,
293+
PaginationKeyName: lastSuccessfulPaginationKey.ColumnName(),
294294
Status: tableProgress.CurrentAction,
295295
LastSuccessfulPaginationKey: lastSuccessfulPaginationKey,
296296
TargetPaginationKey: tableProgress.TargetPaginationKey,

ferry.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ func (f *Ferry) NewDataIterator() *DataIterator {
110110
BatchSizePerTableOverride: f.Config.DataIterationBatchSizePerTableOverride,
111111
ReadRetries: f.Config.DBReadRetries,
112112
},
113-
StateTracker: f.StateTracker,
113+
StateTracker: f.StateTracker,
114114
TargetPaginationKeys: &sync.Map{},
115115
}
116116

@@ -993,9 +993,9 @@ func (f *Ferry) Progress() *Progress {
993993
rowStatsWrittenPerTable := f.StateTracker.RowStatsWrittenPerTable()
994994

995995
s.Tables = make(map[string]TableProgress)
996-
targetPaginationKeys := make(map[string]uint64)
996+
targetPaginationKeys := make(map[string]PaginationKey)
997997
f.DataIterator.TargetPaginationKeys.Range(func(k, v interface{}) bool {
998-
targetPaginationKeys[k.(string)] = uint64(v.(PaginationKey).NumericPosition())
998+
targetPaginationKeys[k.(string)] = v.(PaginationKey)
999999
return true
10001000
})
10011001

@@ -1022,9 +1022,9 @@ func (f *Ferry) Progress() *Progress {
10221022

10231023
rowWrittenStats, _ := rowStatsWrittenPerTable[tableName]
10241024

1025-
var lastSuccessfulPaginationKey uint64
1025+
var lastSuccessfulPaginationKey PaginationKey
10261026
if lastSuccessfulPaginationKeyInterface != nil {
1027-
lastSuccessfulPaginationKey = uint64(lastSuccessfulPaginationKeyInterface.NumericPosition())
1027+
lastSuccessfulPaginationKey = lastSuccessfulPaginationKeyInterface
10281028
}
10291029

10301030
s.Tables[tableName] = TableProgress{
@@ -1042,7 +1042,7 @@ func (f *Ferry) Progress() *Progress {
10421042
var completedPaginationKeys uint64 = 0
10431043
estimatedPaginationKeysPerSecond := f.StateTracker.EstimatedPaginationKeysPerSecond()
10441044
for _, targetPaginationKey := range targetPaginationKeys {
1045-
totalPaginationKeysToCopy += targetPaginationKey
1045+
totalPaginationKeysToCopy += uint64(targetPaginationKey.NumericPosition())
10461046
}
10471047

10481048
for _, completedPaginationKey := range serializedState.LastSuccessfulPaginationKeys {

pagination_key.go

Lines changed: 79 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ import (
1717
type PaginationKey interface {
1818
// SQLValue returns the value to use in SQL WHERE clauses (e.g., WHERE id > ?).
1919
SQLValue() interface{}
20+
// ColumnName returns the column name this key belongs to, if known.
21+
ColumnName() string
2022
// Compare returns -1, 0, or 1 if this key is less than, equal to, or greater than other.
2123
Compare(other PaginationKey) int
2224
// NumericPosition returns a float64 approximation for progress tracking and estimation.
@@ -29,14 +31,31 @@ type PaginationKey interface {
2931
IsMax() bool
3032
}
3133

32-
type Uint64Key uint64
34+
type Uint64Key struct {
35+
Column string
36+
Value uint64
37+
}
38+
39+
type encodedKey struct {
40+
Type string `json:"type"`
41+
Value json.RawMessage `json:"value"`
42+
Column string `json:"column,omitempty"`
43+
}
3344

3445
func NewUint64Key(value uint64) Uint64Key {
35-
return Uint64Key(value)
46+
return Uint64Key{Value: value}
47+
}
48+
49+
func NewUint64KeyWithColumn(column string, value uint64) Uint64Key {
50+
return Uint64Key{Column: column, Value: value}
3651
}
3752

3853
func (k Uint64Key) SQLValue() interface{} {
39-
return uint64(k)
54+
return k.Value
55+
}
56+
57+
func (k Uint64Key) ColumnName() string {
58+
return k.Column
4059
}
4160

4261
func (k Uint64Key) Compare(other PaginationKey) int {
@@ -45,48 +64,71 @@ func (k Uint64Key) Compare(other PaginationKey) int {
4564
panic(fmt.Sprintf("cannot compare Uint64Key with %T", other))
4665
}
4766

48-
if k < otherKey {
67+
if k.Value < otherKey.Value {
4968
return -1
50-
} else if k > otherKey {
69+
} else if k.Value > otherKey.Value {
5170
return 1
5271
}
5372
return 0
5473
}
5574

5675
func (k Uint64Key) NumericPosition() float64 {
57-
return float64(k)
76+
return float64(k.Value)
5877
}
5978

6079
func (k Uint64Key) String() string {
61-
return fmt.Sprintf("%d", uint64(k))
80+
return fmt.Sprintf("%d", k.Value)
6281
}
6382

6483
func (k Uint64Key) IsMax() bool {
65-
return k == Uint64Key(math.MaxUint64)
84+
return k.Value == math.MaxUint64
6685
}
6786

6887
func (k Uint64Key) MarshalJSON() ([]byte, error) {
69-
return json.Marshal(uint64(k))
88+
valBytes, err := json.Marshal(k.Value)
89+
90+
if err != nil {
91+
return nil, err
92+
}
93+
94+
return json.Marshal(encodedKey{
95+
Type: "uint64",
96+
Value: valBytes,
97+
Column: k.Column,
98+
})
7099
}
71100

72-
type BinaryKey []byte
101+
type BinaryKey struct {
102+
Column string
103+
Value []byte
104+
}
73105

74106
func NewBinaryKey(value []byte) BinaryKey {
75107
clone := make([]byte, len(value))
76108
copy(clone, value)
77-
return BinaryKey(clone)
109+
return BinaryKey{Value: clone}
110+
}
111+
112+
func NewBinaryKeyWithColumn(column string, value []byte) BinaryKey {
113+
clone := make([]byte, len(value))
114+
copy(clone, value)
115+
return BinaryKey{Column: column, Value: clone}
78116
}
79117

80118
func (k BinaryKey) SQLValue() interface{} {
81-
return []byte(k)
119+
return k.Value
120+
}
121+
122+
func (k BinaryKey) ColumnName() string {
123+
return k.Column
82124
}
83125

84126
func (k BinaryKey) Compare(other PaginationKey) int {
85127
otherKey, ok := other.(BinaryKey)
86128
if !ok {
87129
panic(fmt.Sprintf("type mismatch: cannot compare BinaryKey with %T", other))
88130
}
89-
return bytes.Compare(k, otherKey)
131+
return bytes.Compare(k.Value, otherKey.Value)
90132
}
91133

92134
// NumericPosition calculates a rough float position for progress tracking.
@@ -98,29 +140,29 @@ func (k BinaryKey) Compare(other PaginationKey) int {
98140
//
99141
// The core pagination algorithm (using Compare()) is unaffected and works correctly with any binary data.
100142
func (k BinaryKey) NumericPosition() float64 {
101-
if len(k) == 0 {
143+
if len(k.Value) == 0 {
102144
return 0.0
103145
}
104146

105147
// Take up to the first 8 bytes to form a uint64 for estimation
106148
var buf [8]byte
107-
copy(buf[:], k)
149+
copy(buf[:], k.Value)
108150

109151
val := binary.BigEndian.Uint64(buf[:])
110152
return float64(val)
111153
}
112154

113155
func (k BinaryKey) String() string {
114-
return hex.EncodeToString(k)
156+
return hex.EncodeToString(k.Value)
115157
}
116158

117159
func (k BinaryKey) IsMax() bool {
118160
// We cannot know the true "Max" of a VARBINARY without knowing the length.
119161
// However, for UUID(16), we can check for FF...
120-
if len(k) == 0 {
162+
if len(k.Value) == 0 {
121163
return false
122164
}
123-
for _, b := range k {
165+
for _, b := range k.Value {
124166
if b != 0xFF {
125167
return false
126168
}
@@ -129,37 +171,16 @@ func (k BinaryKey) IsMax() bool {
129171
}
130172

131173
func (k BinaryKey) MarshalJSON() ([]byte, error) {
132-
return json.Marshal(hex.EncodeToString(k))
133-
}
134-
135-
type encodedKey struct {
136-
Type string `json:"type"`
137-
Value json.RawMessage `json:"value"`
138-
}
139-
140-
func MarshalPaginationKey(k PaginationKey) ([]byte, error) {
141-
var typeName string
142-
var valBytes []byte
143-
var err error
144-
145-
switch t := k.(type) {
146-
case Uint64Key:
147-
typeName = "uint64"
148-
valBytes, err = t.MarshalJSON()
149-
case BinaryKey:
150-
typeName = "binary"
151-
valBytes, err = t.MarshalJSON()
152-
default:
153-
return nil, fmt.Errorf("unknown pagination key type: %T", k)
154-
}
174+
valBytes, err := json.Marshal(hex.EncodeToString(k.Value))
155175

156176
if err != nil {
157177
return nil, err
158178
}
159179

160180
return json.Marshal(encodedKey{
161-
Type: typeName,
162-
Value: valBytes,
181+
Type: "binary",
182+
Value: valBytes,
183+
Column: k.Column,
163184
})
164185
}
165186

@@ -175,7 +196,9 @@ func UnmarshalPaginationKey(data []byte) (PaginationKey, error) {
175196
if err := json.Unmarshal(wrapper.Value, &i); err != nil {
176197
return nil, err
177198
}
178-
return NewUint64Key(i), nil
199+
key := NewUint64Key(i)
200+
key.Column = wrapper.Column
201+
return key, nil
179202
case "binary":
180203
var s string
181204
if err := json.Unmarshal(wrapper.Value, &s); err != nil {
@@ -185,7 +208,9 @@ func UnmarshalPaginationKey(data []byte) (PaginationKey, error) {
185208
if err != nil {
186209
return nil, err
187210
}
188-
return NewBinaryKey(b), nil
211+
key := NewBinaryKey(b)
212+
key.Column = wrapper.Column
213+
return key, nil
189214
default:
190215
return nil, fmt.Errorf("unknown key type: %s", wrapper.Type)
191216
}
@@ -194,21 +219,21 @@ func UnmarshalPaginationKey(data []byte) (PaginationKey, error) {
194219
func MinPaginationKey(column *schema.TableColumn) PaginationKey {
195220
switch column.Type {
196221
case schema.TYPE_NUMBER, schema.TYPE_MEDIUM_INT:
197-
return NewUint64Key(0)
222+
return NewUint64KeyWithColumn(column.Name, 0)
198223
// Handle all potential binary/string types
199224
case schema.TYPE_BINARY, schema.TYPE_STRING:
200225
// The smallest value for any binary/string type is an empty slice.
201226
// Even for fixed BINARY(N), starting at empty ensures we catch [0x00, ...]
202-
return NewBinaryKey([]byte{})
227+
return NewBinaryKeyWithColumn(column.Name, []byte{})
203228
default:
204-
return NewUint64Key(0)
229+
return NewUint64KeyWithColumn(column.Name, 0)
205230
}
206231
}
207232

208233
func MaxPaginationKey(column *schema.TableColumn) PaginationKey {
209234
switch column.Type {
210235
case schema.TYPE_NUMBER, schema.TYPE_MEDIUM_INT:
211-
return NewUint64Key(math.MaxUint64)
236+
return NewUint64KeyWithColumn(column.Name, math.MaxUint64)
212237
case schema.TYPE_BINARY, schema.TYPE_STRING:
213238
// SAFETY: Cap the size to prevent OOM on LONGBLOB (4GB).
214239
// InnoDB index limit is 3072 bytes. 4KB is a safe upper bound for a PK.
@@ -221,9 +246,9 @@ func MaxPaginationKey(column *schema.TableColumn) PaginationKey {
221246
for i := range maxBytes {
222247
maxBytes[i] = 0xFF
223248
}
224-
return NewBinaryKey(maxBytes)
249+
return NewBinaryKeyWithColumn(column.Name, maxBytes)
225250
default:
226-
return NewUint64Key(math.MaxUint64)
251+
return NewUint64KeyWithColumn(column.Name, math.MaxUint64)
227252
}
228253
}
229254

@@ -236,7 +261,7 @@ func NewPaginationKeyFromRow(rowData RowData, index int, column *schema.TableCol
236261
if err != nil {
237262
return nil, fmt.Errorf("failed to get uint64 pagination key: %w", err)
238263
}
239-
return NewUint64Key(value), nil
264+
return NewUint64KeyWithColumn(column.Name, value), nil
240265

241266
case schema.TYPE_BINARY, schema.TYPE_STRING:
242267
valueInterface := rowData[index]
@@ -249,14 +274,14 @@ func NewPaginationKeyFromRow(rowData RowData, index int, column *schema.TableCol
249274
default:
250275
return nil, fmt.Errorf("expected binary pagination key to be []byte or string, got %T", valueInterface)
251276
}
252-
return NewBinaryKey(valueBytes), nil
277+
return NewBinaryKeyWithColumn(column.Name, valueBytes), nil
253278

254279
default:
255280
// Fallback for other integer types
256281
value, err := rowData.GetUint64(index)
257282
if err != nil {
258283
return nil, fmt.Errorf("failed to get pagination key: %w", err)
259284
}
260-
return NewUint64Key(value), nil
285+
return NewUint64KeyWithColumn(column.Name, value), nil
261286
}
262287
}

0 commit comments

Comments
 (0)