-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathtransaction.go
More file actions
198 lines (175 loc) · 7.48 KB
/
transaction.go
File metadata and controls
198 lines (175 loc) · 7.48 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
package entries
import (
"context"
"encoding/hex"
"fmt"
"github.com/deso-protocol/core/lib"
"github.com/deso-protocol/state-consumer/consumer"
"github.com/pkg/errors"
"github.com/uptrace/bun"
"time"
)
type TransactionEntry struct {
TransactionHash string `pg:",pk,use_zero"`
TransactionId string `pg:",use_zero"`
BlockHash string
Version uint16
Inputs []map[string]string `bun:"type:jsonb"`
Outputs []map[string]string `bun:"type:jsonb"`
FeeNanos uint64
NonceExpirationBlockHeight uint64
NoncePartialId uint64
TxnMeta lib.DeSoTxnMetadata `bun:"type:jsonb"`
TxIndexMetadata lib.DeSoEncoder `bun:"type:jsonb"`
TxIndexBasicTransferMetadata lib.DeSoEncoder `bun:"type:jsonb"`
TxnMetaBytes []byte
TxnBytes []byte
TxnType uint16
PublicKey string
ExtraData map[string]string `bun:"type:jsonb"`
Signature []byte
IndexInBlock uint64
BlockHeight uint64
Timestamp time.Time `pg:",use_zero"`
Connects bool
BadgerKey []byte `pg:",use_zero"`
}
type PGTransactionEntry struct {
bun.BaseModel `bun:"table:transaction_partitioned"`
TransactionEntry
}
func TransactionEncoderToPGStruct(
transaction *lib.MsgDeSoTxn,
blockIndex uint64,
blockHash string,
blockHeight uint64,
timestamp time.Time,
connects bool,
params *lib.DeSoParams,
) (*PGTransactionEntry, error) {
var txInputs []map[string]string
for _, input := range transaction.TxInputs {
txInputs = append(txInputs, map[string]string{
"txid": hex.EncodeToString(input.TxID[:]),
"index": fmt.Sprintf("%d", input.Index),
})
}
var txOutputs []map[string]string
for _, output := range transaction.TxOutputs {
txOutputs = append(txOutputs, map[string]string{
"public_key": consumer.PublicKeyBytesToBase58Check(output.PublicKey[:], params),
"amount_nanos": fmt.Sprintf("%d", output.AmountNanos),
})
}
txnMetaBytes, err := transaction.TxnMeta.ToBytes(false)
if err != nil {
return nil, errors.Wrapf(err, "TransactionEncoderToPGStruct: Problem converting txn meta to bytes")
}
txnBytes, err := transaction.ToBytes(false)
if err != nil {
return nil, errors.Wrapf(err, "TransactionEncoderToPGStruct: Problem converting txn to bytes")
}
transactionEntry := &PGTransactionEntry{
TransactionEntry: TransactionEntry{
TransactionHash: hex.EncodeToString(transaction.Hash()[:]),
TransactionId: consumer.PublicKeyBytesToBase58Check(transaction.Hash()[:], params),
BlockHash: blockHash,
Version: uint16(transaction.TxnVersion),
Inputs: txInputs,
Outputs: txOutputs,
FeeNanos: transaction.TxnFeeNanos,
TxnMeta: transaction.TxnMeta,
TxnMetaBytes: txnMetaBytes,
TxnBytes: txnBytes,
TxnType: uint16(transaction.TxnMeta.GetTxnType()),
PublicKey: consumer.PublicKeyBytesToBase58Check(transaction.PublicKey[:], params),
ExtraData: consumer.ExtraDataBytesToString(transaction.ExtraData),
IndexInBlock: blockIndex,
BlockHeight: blockHeight,
Timestamp: timestamp,
Connects: connects,
BadgerKey: transaction.Hash()[:],
},
}
if transaction.TxnNonce != nil {
transactionEntry.NonceExpirationBlockHeight = transaction.TxnNonce.ExpirationBlockHeight
transactionEntry.NoncePartialId = transaction.TxnNonce.PartialID
}
if transaction.Signature.Sign != nil {
transactionEntry.Signature = transaction.Signature.ToBytes()
}
return transactionEntry, nil
}
// TransactionBatchOperation is the entry point for processing a batch of transaction entries. It determines the appropriate handler
// based on the operation type and executes it.
func TransactionBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error {
// We check before we call this function that there is at least one operation type.
// We also ensure before this that all entries have the same operation type.
operationType := entries[0].OperationType
var err error
if operationType == lib.DbOperationTypeDelete {
err = bulkDeleteTransactionEntry(entries, db, operationType)
} else {
err = transformAndBulkInsertTransactionEntry(entries, db, operationType, params)
}
if err != nil {
return errors.Wrapf(err, "entries.PostBatchOperation: Problem with operation type %v", operationType)
}
return nil
}
func transformTransactionEntry(entries []*lib.StateChangeEntry, params *lib.DeSoParams) ([]*PGTransactionEntry, error) {
// Track the unique entries we've inserted so we don't insert the same entry twice.
uniqueTransactions := consumer.UniqueEntries(entries)
// Create a new array to hold the bun struct.
pgTransactionEntrySlice := make([]*PGTransactionEntry, 0)
for _, entry := range uniqueTransactions {
transaction := entry.Encoder.(*lib.MsgDeSoTxn)
// Assuming transactions connect when using this function. We can only
// tell if a transaction connects or not if we have the block.
transactionEntry, err := TransactionEncoderToPGStruct(transaction, 0, "", 0, time.Now(), true, params)
if err != nil {
return nil, errors.Wrapf(err, "entries.transformAndBulkInsertTransactionEntry: Problem converting transaction to PG struct")
}
pgTransactionEntrySlice = append(pgTransactionEntrySlice, transactionEntry)
}
return pgTransactionEntrySlice, nil
}
func bulkInsertTransactionEntry(entries []*PGTransactionEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error {
// Bulk insert the entries.
transactionQuery := db.NewInsert().Model(&entries)
if operationType == lib.DbOperationTypeUpsert {
transactionQuery = transactionQuery.On("CONFLICT (transaction_hash, txn_type) DO UPDATE")
}
if _, err := transactionQuery.Exec(context.Background()); err != nil {
return errors.Wrapf(err, "entries.bulkInsertTransaction: Error inserting entries")
}
return nil
}
// transformAndBulkInsertTransactionEntry inserts a batch of user_association entries into the database.
func transformAndBulkInsertTransactionEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error {
pgTransactionEntrySlice, err := transformTransactionEntry(entries, params)
if err != nil {
return errors.Wrapf(err, "entries.transformAndBulkInsertTransactionEntry: Problem transforming transaction entries")
}
err = bulkInsertTransactionEntry(pgTransactionEntrySlice, db, operationType)
if err != nil {
return errors.Wrapf(err, "entries.transformAndBulkInsertTransactionEntry: Problem inserting transaction entries")
}
return nil
}
// bulkDeleteTransactionEntry deletes a batch of transaction entries from the database.
func bulkDeleteTransactionEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error {
// Track the unique entries we've inserted so we don't insert the same entry twice.
uniqueEntries := consumer.UniqueEntries(entries)
// Transform the entries into a list of keys to delete.
keysToDelete := consumer.KeysToDelete(uniqueEntries)
// Execute the delete query.
if _, err := db.NewDelete().
Model(&PGTransactionEntry{}).
Where("badger_key IN (?)", bun.In(keysToDelete)).
Returning("").
Exec(context.Background()); err != nil {
return errors.Wrapf(err, "entries.bulkDeleteBlockEntry: Error deleting entries")
}
return nil
}