-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathblock.go
More file actions
141 lines (122 loc) · 5.08 KB
/
block.go
File metadata and controls
141 lines (122 loc) · 5.08 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package entries
import (
"context"
"encoding/hex"
"github.com/deso-protocol/core/lib"
"github.com/deso-protocol/state-consumer/consumer"
"github.com/pkg/errors"
"github.com/uptrace/bun"
"time"
)
type BlockEntry struct {
BlockHash string `pg:",pk,use_zero"`
PrevBlockHash string
TxnMerkleRoot string
Timestamp time.Time
Height uint64
Nonce uint64
ExtraNonce uint64
BadgerKey []byte `pg:",use_zero"`
}
type PGBlockEntry struct {
bun.BaseModel `bun:"table:block"`
BlockEntry
}
// Convert the UserAssociation DeSo encoder to the PG struct used by bun.
func BlockEncoderToPGStruct(block *lib.MsgDeSoBlock, keyBytes []byte) *PGBlockEntry {
blockHash, _ := block.Hash()
return &PGBlockEntry{
BlockEntry: BlockEntry{
BlockHash: hex.EncodeToString(blockHash[:]),
PrevBlockHash: hex.EncodeToString(block.Header.PrevBlockHash[:]),
TxnMerkleRoot: hex.EncodeToString(block.Header.TransactionMerkleRoot[:]),
Timestamp: consumer.UnixNanoToTime(block.Header.TstampSecs * 1e9),
Height: block.Header.Height,
Nonce: block.Header.Nonce,
ExtraNonce: block.Header.ExtraNonce,
BadgerKey: keyBytes,
},
}
}
// PostBatchOperation is the entry point for processing a batch of post entries. It determines the appropriate handler
// based on the operation type and executes it.
func BlockBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error {
// We check before we call this function that there is at least one operation type.
// We also ensure before this that all entries have the same operation type.
operationType := entries[0].OperationType
var err error
if operationType == lib.DbOperationTypeDelete {
err = bulkDeleteBlockEntry(entries, db, operationType)
} else {
err = bulkInsertBlockEntry(entries, db, operationType, params)
}
if err != nil {
return errors.Wrapf(err, "entries.PostBatchOperation: Problem with operation type %v", operationType)
}
return nil
}
// bulkInsertUtxoOperationsEntry inserts a batch of user_association entries into the database.
func bulkInsertBlockEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error {
// If this block is a part of the initial sync, skip it - it will be handled by the utxo operations.
if operationType == lib.DbOperationTypeInsert {
return nil
}
// Track the unique entries we've inserted so we don't insert the same entry twice.
uniqueBlocks := consumer.UniqueEntries(entries)
// Create a new array to hold the bun struct.
pgBlockEntrySlice := make([]*PGBlockEntry, 0)
pgTransactionEntrySlice := make([]*PGTransactionEntry, 0)
for _, entry := range uniqueBlocks {
block := entry.Encoder.(*lib.MsgDeSoBlock)
blockEntry := BlockEncoderToPGStruct(block, entry.KeyBytes)
pgBlockEntrySlice = append(pgBlockEntrySlice, blockEntry)
for jj, transaction := range block.Txns {
pgTransactionEntry, err := TransactionEncoderToPGStruct(transaction, uint64(jj), blockEntry.BlockHash, blockEntry.Height, blockEntry.Timestamp, params)
if err != nil {
return errors.Wrapf(err, "entries.bulkInsertBlockEntry: Problem converting transaction to PG struct")
}
pgTransactionEntrySlice = append(pgTransactionEntrySlice, pgTransactionEntry)
}
}
blockQuery := db.NewInsert().Model(&pgBlockEntrySlice)
if operationType == lib.DbOperationTypeUpsert {
blockQuery = blockQuery.On("CONFLICT (block_hash) DO UPDATE")
}
if _, err := blockQuery.Exec(context.Background()); err != nil {
return errors.Wrapf(err, "entries.bulkInsertBlock: Error inserting entries")
}
err := bulkInsertTransactionEntry(pgTransactionEntrySlice, db, operationType)
if err != nil {
return errors.Wrapf(err, "entries.bulkInsertBlock: Error inserting transaction entries")
}
return nil
}
// bulkDeleteBlockEntry deletes a batch of block entries from the database.
func bulkDeleteBlockEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error {
// Track the unique entries we've inserted so we don't insert the same entry twice.
uniqueEntries := consumer.UniqueEntries(entries)
// Transform the entries into a list of keys to delete.
keysToDelete := consumer.KeysToDelete(uniqueEntries)
// Execute the delete query on the blocks table.
if _, err := db.NewDelete().
Model(&PGBlockEntry{}).
Where("badger_key IN (?)", bun.In(keysToDelete)).
Returning("").
Exec(context.Background()); err != nil {
return errors.Wrapf(err, "entries.bulkDeleteBlockEntry: Error deleting entries")
}
// Get block hashes from keys to delete.
blockHashHexesToDelete := make([]string, len(keysToDelete))
for ii, keyToDelete := range keysToDelete {
blockHashHexesToDelete[ii] = hex.EncodeToString(consumer.GetBlockHashBytesFromKey(keyToDelete))
}
// Delete any transactions associated with the block.
if _, err := db.NewDelete().
Model(&PGBlockEntry{}).
Where("block_hash IN (?)", bun.In(blockHashHexesToDelete)).
Returning("").
Exec(context.Background()); err != nil {
return errors.Wrapf(err, "entries.bulkDeleteBlockEntry: Error deleting transaction entries")
}
return nil
}