Skip to content

Commit 67b186a

Browse files
committed
Merge remote-tracking branch 'upstream/develop' into develop
2 parents 8975bab + 1aeb7fc commit 67b186a

19 files changed

Lines changed: 492 additions & 70 deletions

File tree

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

8+
## v4.6.7
9+
10+
* Previous fix for ordering issue was not properly handling blocks batched together and was working at block level. Fix now extended to work when also inside a batch.
11+
812
## v4.6.6
913

1014
### `substreams-sink-sql run`

db_changes/db/db.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ type Loader struct {
4747

4848
testTx *TestTx // used for testing: if non-nil, 'loader.BeginTx()' will return this object instead of a real *sql.Tx
4949
dsn *DSN
50+
51+
batchOrdinal uint64 // Counter for ordinals within the current batch, resets on flush
5052
}
5153

5254
func NewLoader(
@@ -467,6 +469,13 @@ func (l *Loader) GetDSN() *DSN {
467469
return l.dsn
468470
}
469471

472+
// NextBatchOrdinal returns the next ordinal for the current batch and increments the counter
473+
func (l *Loader) NextBatchOrdinal() uint64 {
474+
ordinal := l.batchOrdinal
475+
l.batchOrdinal++
476+
return ordinal
477+
}
478+
470479
type obfuscatedString string
471480

472481
func (s obfuscatedString) String() string {

db_changes/db/flush.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,4 +82,5 @@ func (l *Loader) reset() {
8282
for entriesPair := l.entries.Oldest(); entriesPair != nil; entriesPair = entriesPair.Next() {
8383
l.entries.Set(entriesPair.Key, NewOrderedMap[string, *Operation]())
8484
}
85+
l.batchOrdinal = 0
8586
}

db_changes/db/operations.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -92,16 +92,11 @@ func (o *Operation) mergeData(newData map[string]string) error {
9292
}
9393

9494
// mergeOperation merges another operation into this one, keeping the lowest ordinal
95-
func (o *Operation) mergeOperation(otherOrdinal uint64, otherData map[string]string) error {
95+
func (o *Operation) mergeOperation(otherData map[string]string) error {
9696
if o.opType == OperationTypeDelete {
9797
return fmt.Errorf("unable to merge operation for a delete operation")
9898
}
9999

100-
// Keep the lowest ordinal
101-
if otherOrdinal < o.ordinal {
102-
o.ordinal = otherOrdinal
103-
}
104-
105100
return o.mergeData(otherData)
106101
}
107102

db_changes/db/ops.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111

1212
// Insert a row in the DB, it is assumed the table exists, you can do a
1313
// check before with HasTable()
14-
func (l *Loader) Insert(tableName string, primaryKey map[string]string, data map[string]string, ordinal uint64, reversibleBlockNum *uint64) error {
14+
func (l *Loader) Insert(tableName string, primaryKey map[string]string, data map[string]string, reversibleBlockNum *uint64) error {
1515
uniqueID := createRowUniqueID(primaryKey)
1616

1717
if l.tracer.Enabled() {
@@ -59,7 +59,7 @@ func (l *Loader) Insert(tableName string, primaryKey map[string]string, data map
5959
}
6060
}
6161

62-
entry.Set(uniqueID, l.newInsertOperation(table, primaryKey, data, ordinal, reversibleBlockNum))
62+
entry.Set(uniqueID, l.newInsertOperation(table, primaryKey, data, l.NextBatchOrdinal(), reversibleBlockNum))
6363
l.entriesCount++
6464
return nil
6565
}
@@ -101,7 +101,7 @@ func (l *Loader) GetPrimaryKey(tableName string, pk string) (map[string]string,
101101

102102
// Upsert a row in the DB, it is assumed the table exists, you can do a
103103
// check before with HasTable().
104-
func (l *Loader) Upsert(tableName string, primaryKey map[string]string, data map[string]string, ordinal uint64, reversibleBlockNum *uint64) error {
104+
func (l *Loader) Upsert(tableName string, primaryKey map[string]string, data map[string]string, reversibleBlockNum *uint64) error {
105105
if l.dialect.OnlyInserts() {
106106
return fmt.Errorf("update operation is not supported by the current database")
107107
}
@@ -147,7 +147,7 @@ func (l *Loader) Upsert(tableName string, primaryKey map[string]string, data map
147147
l.logger.Debug("primary key entry already exist for table, merging columns together", zap.String("primary_key", uniqueID), zap.String("table_name", tableName))
148148
}
149149

150-
op.mergeOperation(ordinal, data)
150+
op.mergeOperation(data)
151151
entry.Set(uniqueID, op)
152152
return nil
153153
} else {
@@ -165,13 +165,13 @@ func (l *Loader) Upsert(tableName string, primaryKey map[string]string, data map
165165
}
166166
}
167167

168-
entry.Set(uniqueID, l.newUpsertOperation(table, primaryKey, data, ordinal, reversibleBlockNum))
168+
entry.Set(uniqueID, l.newUpsertOperation(table, primaryKey, data, l.NextBatchOrdinal(), reversibleBlockNum))
169169
return nil
170170
}
171171

172172
// Update a row in the DB, it is assumed the table exists, you can do a
173173
// check before with HasTable()
174-
func (l *Loader) Update(tableName string, primaryKey map[string]string, data map[string]string, ordinal uint64, reversibleBlockNum *uint64) error {
174+
func (l *Loader) Update(tableName string, primaryKey map[string]string, data map[string]string, reversibleBlockNum *uint64) error {
175175
if l.dialect.OnlyInserts() {
176176
return fmt.Errorf("update operation is not supported by the current database")
177177
}
@@ -216,7 +216,7 @@ func (l *Loader) Update(tableName string, primaryKey map[string]string, data map
216216
l.logger.Debug("primary key entry already exist for table, merging fields together", zap.String("primary_key", uniqueID), zap.String("table_name", tableName))
217217
}
218218

219-
op.mergeOperation(ordinal, data)
219+
op.mergeOperation(data)
220220
entry.Set(uniqueID, op)
221221
return nil
222222
} else {
@@ -227,13 +227,13 @@ func (l *Loader) Update(tableName string, primaryKey map[string]string, data map
227227
l.logger.Debug("primary key entry never existed for table, adding update operation", zap.String("primary_key", uniqueID), zap.String("table_name", tableName))
228228
}
229229

230-
entry.Set(uniqueID, l.newUpdateOperation(table, primaryKey, data, ordinal, reversibleBlockNum))
230+
entry.Set(uniqueID, l.newUpdateOperation(table, primaryKey, data, l.NextBatchOrdinal(), reversibleBlockNum))
231231
return nil
232232
}
233233

234234
// Delete a row in the DB, it is assumed the table exists, you can do a
235235
// check before with HasTable()
236-
func (l *Loader) Delete(tableName string, primaryKey map[string]string, ordinal uint64, reversibleBlockNum *uint64) error {
236+
func (l *Loader) Delete(tableName string, primaryKey map[string]string, reversibleBlockNum *uint64) error {
237237
if l.dialect.OnlyInserts() {
238238
return fmt.Errorf("delete operation is not supported by the current database")
239239
}
@@ -274,6 +274,6 @@ func (l *Loader) Delete(tableName string, primaryKey map[string]string, ordinal
274274
l.logger.Debug("adding deleting operation", zap.String("primary_key", uniqueID), zap.String("table_name", tableName))
275275
}
276276

277-
entry.Set(uniqueID, l.newDeleteOperation(table, primaryKey, ordinal, reversibleBlockNum))
277+
entry.Set(uniqueID, l.newDeleteOperation(table, primaryKey, l.NextBatchOrdinal(), reversibleBlockNum))
278278
return nil
279279
}

db_changes/sinker/sinker.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ func (s *SQLSinker) HandleBlockScopedData(ctx context.Context, data *pbsubstream
229229
}
230230

231231
func (s *SQLSinker) applyDatabaseChanges(dbChanges *pbdatabase.DatabaseChanges, blockNum, finalBlockNum uint64) error {
232-
for ordinal, change := range dbChanges.TableChanges {
232+
for _, change := range dbChanges.TableChanges {
233233
if !s.loader.HasTable(change.Table) {
234234
return fmt.Errorf(
235235
"your Substreams sent us a change for a table named %s we don't know about on %s (available tables: %s)",
@@ -265,22 +265,22 @@ func (s *SQLSinker) applyDatabaseChanges(dbChanges *pbdatabase.DatabaseChanges,
265265

266266
switch change.Operation {
267267
case pbdatabase.TableChange_OPERATION_CREATE:
268-
err := s.loader.Insert(change.Table, primaryKeys, changes, uint64(ordinal), reversibleBlockNum)
268+
err := s.loader.Insert(change.Table, primaryKeys, changes, reversibleBlockNum)
269269
if err != nil {
270270
return fmt.Errorf("database insert: %w", err)
271271
}
272272
case pbdatabase.TableChange_OPERATION_UPSERT:
273-
err := s.loader.Upsert(change.Table, primaryKeys, changes, uint64(ordinal), reversibleBlockNum)
273+
err := s.loader.Upsert(change.Table, primaryKeys, changes, reversibleBlockNum)
274274
if err != nil {
275275
return fmt.Errorf("database upsert: %w", err)
276276
}
277277
case pbdatabase.TableChange_OPERATION_UPDATE:
278-
err := s.loader.Update(change.Table, primaryKeys, changes, uint64(ordinal), reversibleBlockNum)
278+
err := s.loader.Update(change.Table, primaryKeys, changes, reversibleBlockNum)
279279
if err != nil {
280280
return fmt.Errorf("database update: %w", err)
281281
}
282282
case pbdatabase.TableChange_OPERATION_DELETE:
283-
err := s.loader.Delete(change.Table, primaryKeys, uint64(ordinal), reversibleBlockNum)
283+
err := s.loader.Delete(change.Table, primaryKeys, reversibleBlockNum)
284284
if err != nil {
285285
return fmt.Errorf("database delete: %w", err)
286286
}

db_proto/sql/click_house/accumulator_inserter.go

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,113 @@ func (i *AccumulatorInserter) insert(table string, values []any) error {
188188
input.Append(value.([]byte))
189189
case *proto.ColBool:
190190
input.Append(value.(bool))
191+
// Handle array column types
192+
case *proto.ColArr[int32]:
193+
if arr, ok := value.([]interface{}); ok {
194+
int32Arr := make([]int32, len(arr))
195+
for i, v := range arr {
196+
int32Arr[i] = v.(int32)
197+
}
198+
input.Append(int32Arr)
199+
} else {
200+
panic(fmt.Sprintf("expected []interface{} for array column %s of table %s, got %T", colName, table, value))
201+
}
202+
case *proto.ColArr[int64]:
203+
if arr, ok := value.([]interface{}); ok {
204+
int64Arr := make([]int64, len(arr))
205+
for i, v := range arr {
206+
int64Arr[i] = v.(int64)
207+
}
208+
input.Append(int64Arr)
209+
} else {
210+
panic(fmt.Sprintf("expected []interface{} for array column %s of table %s, got %T", colName, table, value))
211+
}
212+
case *proto.ColArr[uint32]:
213+
if arr, ok := value.([]interface{}); ok {
214+
uint32Arr := make([]uint32, len(arr))
215+
for i, v := range arr {
216+
uint32Arr[i] = v.(uint32)
217+
}
218+
input.Append(uint32Arr)
219+
} else {
220+
panic(fmt.Sprintf("expected []interface{} for array column %s of table %s, got %T", colName, table, value))
221+
}
222+
case *proto.ColArr[uint64]:
223+
if arr, ok := value.([]interface{}); ok {
224+
uint64Arr := make([]uint64, len(arr))
225+
for i, v := range arr {
226+
uint64Arr[i] = v.(uint64)
227+
}
228+
input.Append(uint64Arr)
229+
} else {
230+
panic(fmt.Sprintf("expected []interface{} for array column %s of table %s, got %T", colName, table, value))
231+
}
232+
case *proto.ColArr[float32]:
233+
if arr, ok := value.([]interface{}); ok {
234+
float32Arr := make([]float32, len(arr))
235+
for i, v := range arr {
236+
float32Arr[i] = v.(float32)
237+
}
238+
input.Append(float32Arr)
239+
} else {
240+
panic(fmt.Sprintf("expected []interface{} for array column %s of table %s, got %T", colName, table, value))
241+
}
242+
case *proto.ColArr[float64]:
243+
if arr, ok := value.([]interface{}); ok {
244+
float64Arr := make([]float64, len(arr))
245+
for i, v := range arr {
246+
float64Arr[i] = v.(float64)
247+
}
248+
input.Append(float64Arr)
249+
} else {
250+
panic(fmt.Sprintf("expected []interface{} for array column %s of table %s, got %T", colName, table, value))
251+
}
252+
case *proto.ColArr[bool]:
253+
if arr, ok := value.([]interface{}); ok {
254+
boolArr := make([]bool, len(arr))
255+
for i, v := range arr {
256+
boolArr[i] = v.(bool)
257+
}
258+
input.Append(boolArr)
259+
} else {
260+
panic(fmt.Sprintf("expected []interface{} for array column %s of table %s, got %T", colName, table, value))
261+
}
262+
case *proto.ColArr[string]:
263+
if arr, ok := value.([]interface{}); ok {
264+
stringArr := make([]string, len(arr))
265+
for i, v := range arr {
266+
stringArr[i] = v.(string)
267+
}
268+
input.Append(stringArr)
269+
} else {
270+
panic(fmt.Sprintf("expected []interface{} for array column %s of table %s, got %T", colName, table, value))
271+
}
272+
case *proto.ColArr[[]byte]:
273+
if arr, ok := value.([]interface{}); ok {
274+
bytesArr := make([][]byte, len(arr))
275+
for i, v := range arr {
276+
bytesArr[i] = v.([]byte)
277+
}
278+
input.Append(bytesArr)
279+
} else {
280+
panic(fmt.Sprintf("expected []interface{} for array column %s of table %s, got %T", colName, table, value))
281+
}
282+
case *proto.ColArr[time.Time]:
283+
if arr, ok := value.([]interface{}); ok {
284+
timeArr := make([]time.Time, len(arr))
285+
for i, v := range arr {
286+
if t, ok := v.(*timestamppb.Timestamp); ok {
287+
timeArr[i] = t.AsTime()
288+
} else if t, ok := v.(time.Time); ok {
289+
timeArr[i] = t
290+
} else {
291+
panic(fmt.Sprintf("unknown time type %T in array for column %s of table %s", v, colName, table))
292+
}
293+
}
294+
input.Append(timeArr)
295+
} else {
296+
panic(fmt.Sprintf("expected []interface{} for array column %s of table %s, got %T", colName, table, value))
297+
}
191298
default:
192299
panic(fmt.Sprintf("unknown input type %T for column %s of table %s", input, colName, table))
193300
}

db_proto/sql/click_house/dialect.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -114,13 +114,6 @@ func (d *DialectClickHouse) createTable(table *schema.Table) error {
114114

115115
fieldName := f.Name
116116

117-
switch {
118-
case f.IsRepeated:
119-
continue
120-
case f.IsMessage:
121-
case f.ForeignKey != nil:
122-
}
123-
124117
fieldType := MapFieldType(f.FieldDescriptor)
125118
sb.WriteString(fmt.Sprintf("%s %s", fieldName, fieldType))
126119
sb.WriteString(",")

0 commit comments

Comments
 (0)