-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathvalidator.go
More file actions
253 lines (210 loc) · 9.97 KB
/
validator.go
File metadata and controls
253 lines (210 loc) · 9.97 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
package entries
import (
"context"
"github.com/deso-protocol/core/lib"
"github.com/deso-protocol/state-consumer/consumer"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/pkg/errors"
"github.com/uptrace/bun"
"github.com/uptrace/bun/extra/bunbig"
)
// TODO: when to use nullzero vs use_zero?
type ValidatorEntry struct {
ValidatorPKID string `bun:",nullzero"`
Domains []string `bun:",array"`
DisableDelegatedStake bool
DelegatedStakeCommissionBasisPoints uint64
VotingPublicKey string `bun:",nullzero"`
VotingAuthorization string `bun:",nullzero"`
// Use bunbig.Int to store the balance as a numeric in the pg database.
TotalStakeAmountNanos *bunbig.Int `pg:",use_zero"`
LastActiveAtEpochNumber uint64
JailedAtEpochNumber uint64
ExtraData map[string]string `bun:"type:jsonb"`
BadgerKey []byte `pg:",pk,use_zero"`
}
type PGValidatorEntry struct {
bun.BaseModel `bun:"table:validator_entry"`
ValidatorEntry
}
// TODO: Do I need this?
type PGValidatorEntryUtxoOps struct {
bun.BaseModel `bun:"table:validator_entry_utxo_ops"`
ValidatorEntry
UtxoOperation
}
type SnapshotValidatorEntry struct {
ValidatorPKID string `bun:",nullzero"`
Domains []string `bun:",array"`
DisableDelegatedStake bool
DelegatedStakeCommissionBasisPoints uint64
VotingPublicKey string `bun:",nullzero"`
VotingAuthorization string `bun:",nullzero"`
// Use bunbig.Int to store the balance as a numeric in the pg database.
TotalStakeAmountNanos *bunbig.Int `pg:",use_zero"`
LastActiveAtEpochNumber uint64
JailedAtEpochNumber uint64
SnapshotAtEpochNumber uint64 `pg:",use_zero"`
ExtraData map[string]string `bun:"type:jsonb"`
BadgerKey []byte `pg:",pk,use_zero"`
}
type PGSnapshotValidatorEntry struct {
bun.BaseModel `bun:"table:snapshot_validator_entry"`
SnapshotValidatorEntry
}
// Convert the ValidatorEntry DeSo encoder to the PGValidatorEntry struct used by bun.
func ValidatorEncoderToPGStruct(validatorEntry *lib.ValidatorEntry, keyBytes []byte, params *lib.DeSoParams) ValidatorEntry {
pgValidatorEntry := ValidatorEntry{
ExtraData: consumer.ExtraDataBytesToString(validatorEntry.ExtraData, params),
BadgerKey: keyBytes,
}
if validatorEntry.ValidatorPKID != nil {
pgValidatorEntry.ValidatorPKID = consumer.PublicKeyBytesToBase58Check((*validatorEntry.ValidatorPKID)[:], params)
}
if validatorEntry.Domains != nil {
pgValidatorEntry.Domains = make([]string, len(validatorEntry.Domains))
for ii, domain := range validatorEntry.Domains {
pgValidatorEntry.Domains[ii] = string(domain)
}
}
pgValidatorEntry.DisableDelegatedStake = validatorEntry.DisableDelegatedStake
pgValidatorEntry.DelegatedStakeCommissionBasisPoints = validatorEntry.DelegatedStakeCommissionBasisPoints
if validatorEntry.VotingPublicKey != nil {
pgValidatorEntry.VotingPublicKey = validatorEntry.VotingPublicKey.ToString()
}
if validatorEntry.VotingAuthorization != nil {
pgValidatorEntry.VotingAuthorization = validatorEntry.VotingAuthorization.ToString()
}
pgValidatorEntry.TotalStakeAmountNanos = bunbig.FromMathBig(validatorEntry.TotalStakeAmountNanos.ToBig())
pgValidatorEntry.LastActiveAtEpochNumber = validatorEntry.LastActiveAtEpochNumber
pgValidatorEntry.JailedAtEpochNumber = validatorEntry.JailedAtEpochNumber
return pgValidatorEntry
}
// ValidatorBatchOperation is the entry point for processing a batch of Validator entries.
// It determines the appropriate handler based on the operation type and executes it.
func ValidatorBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams, cachedEntries *lru.Cache[string, []byte]) error {
// We check before we call this function that there is at least one operation type.
// We also ensure before this that all entries have the same operation type.
operationType := entries[0].OperationType
var err error
if operationType == lib.DbOperationTypeDelete {
err = bulkDeleteValidatorEntry(entries, db, operationType, cachedEntries)
} else {
err = bulkInsertValidatorEntry(entries, db, operationType, params, cachedEntries)
}
if err != nil {
return errors.Wrapf(err, "entries.ValidatorBatchOperation: Problem with operation type %v", operationType)
}
return nil
}
// bulkInsertValidatorEntry inserts a batch of validator entries into the database.
func bulkInsertValidatorEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams, cachedEntries *lru.Cache[string, []byte]) error {
// Track the unique entries we've inserted so we don't insert the same entry twice.
uniqueEntries := consumer.UniqueEntries(entries)
// Filter out any entries that are already tracked in the cache.
uniqueEntries = consumer.FilterCachedEntries(uniqueEntries, cachedEntries)
uniqueValidatorEntries := consumer.FilterEntriesByPrefix(uniqueEntries, lib.Prefixes.PrefixValidatorByPKID)
uniqueSnapshotValidatorEntries := consumer.FilterEntriesByPrefix(uniqueEntries, lib.Prefixes.PrefixSnapshotValidatorSetByPKID)
// Create a new array to hold the bun struct.
pgEntrySlice := make([]*PGValidatorEntry, len(uniqueValidatorEntries))
pgSnapshotEntrySlice := make([]*PGSnapshotValidatorEntry, len(uniqueSnapshotValidatorEntries))
// Loop through the entries and convert them to PGEntry.
for ii, entry := range uniqueValidatorEntries {
pgEntrySlice[ii] = &PGValidatorEntry{ValidatorEntry: ValidatorEncoderToPGStruct(entry.Encoder.(*lib.ValidatorEntry), entry.KeyBytes, params)}
}
for ii, entry := range uniqueSnapshotValidatorEntries {
pgSnapshotEntrySlice[ii] = &PGSnapshotValidatorEntry{SnapshotValidatorEntry: SnapshotValidatorEncoderToPGStruct(entry.Encoder.(*lib.ValidatorEntry), entry.KeyBytes, params)}
}
// Execute the insert query.
if len(pgEntrySlice) > 0 {
query := db.NewInsert().Model(&pgEntrySlice)
if operationType == lib.DbOperationTypeUpsert {
query = query.On("CONFLICT (badger_key) DO UPDATE")
}
if _, err := query.Returning("").Exec(context.Background()); err != nil {
return errors.Wrapf(err, "entries.bulkInsertValidatorEntry: Error inserting validator entries")
}
}
if len(pgSnapshotEntrySlice) > 0 {
query := db.NewInsert().Model(&pgSnapshotEntrySlice)
if operationType == lib.DbOperationTypeUpsert {
query = query.On("CONFLICT (badger_key) DO UPDATE")
}
if _, err := query.Returning("").Exec(context.Background()); err != nil {
return errors.Wrapf(err, "entries.bulkInsertValidatorEntry: Error inserting snapshot validator entries")
}
}
// Add any new entries to the cache.
for _, entry := range uniqueEntries {
cachedEntries.Add(string(entry.KeyBytes), entry.EncoderBytes)
}
return nil
}
// bulkDeleteValidatorEntry deletes a batch of validator entries from the database.
func bulkDeleteValidatorEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, cachedEntries *lru.Cache[string, []byte]) error {
// Track the unique entries we've inserted so we don't insert the same entry twice.
uniqueEntries := consumer.UniqueEntries(entries)
uniqueKeys := consumer.KeysToDelete(uniqueEntries)
// Transform the entries into a list of keys to delete.
validatorKeysToDelete := consumer.FilterKeysByPrefix(uniqueKeys, lib.Prefixes.PrefixValidatorByPKID)
snapshotValidatorKeysToDelete := consumer.FilterKeysByPrefix(
uniqueKeys,
lib.Prefixes.PrefixSnapshotValidatorSetByPKID,
)
// Execute the delete query for validator entries.
if len(validatorKeysToDelete) > 0 {
if _, err := db.NewDelete().
Model(&PGValidatorEntry{}).
Where("badger_key IN (?)", bun.In(validatorKeysToDelete)).
Returning("").
Exec(context.Background()); err != nil {
return errors.Wrapf(err, "entries.bulkDeleteValidatorEntry: Error deleting entries")
}
// Delete cached validator entries.
for _, key := range validatorKeysToDelete {
keyStr := string(key)
cachedEntries.Remove(keyStr)
}
}
// Execute the delete query for snapshot validator entries.
if len(snapshotValidatorKeysToDelete) > 0 {
if _, err := db.NewDelete().
Model(&PGSnapshotValidatorEntry{}).
Where("badger_key IN (?)", bun.In(snapshotValidatorKeysToDelete)).
Returning("").
Exec(context.Background()); err != nil {
return errors.Wrapf(err, "entries.bulkDeleteSnapshotValidatorEntry: Error deleting entries")
}
}
return nil
}
// Convert the SnapshotValidatorEntry DeSo encoder to the PGSnapshotValidatorEntry struct used by bun.
func SnapshotValidatorEncoderToPGStruct(validatorEntry *lib.ValidatorEntry, keyBytes []byte, params *lib.DeSoParams) SnapshotValidatorEntry {
pgValidatorEntry := SnapshotValidatorEntry{
ExtraData: consumer.ExtraDataBytesToString(validatorEntry.ExtraData, params),
BadgerKey: keyBytes,
}
if validatorEntry.ValidatorPKID != nil {
pgValidatorEntry.ValidatorPKID = consumer.PublicKeyBytesToBase58Check((*validatorEntry.ValidatorPKID)[:], params)
}
if validatorEntry.Domains != nil {
pgValidatorEntry.Domains = make([]string, len(validatorEntry.Domains))
for ii, domain := range validatorEntry.Domains {
pgValidatorEntry.Domains[ii] = string(domain)
}
}
pgValidatorEntry.DisableDelegatedStake = validatorEntry.DisableDelegatedStake
pgValidatorEntry.DelegatedStakeCommissionBasisPoints = validatorEntry.DelegatedStakeCommissionBasisPoints
if validatorEntry.VotingPublicKey != nil {
pgValidatorEntry.VotingPublicKey = validatorEntry.VotingPublicKey.ToString()
}
if validatorEntry.VotingAuthorization != nil {
pgValidatorEntry.VotingAuthorization = validatorEntry.VotingAuthorization.ToString()
}
pgValidatorEntry.TotalStakeAmountNanos = bunbig.FromMathBig(validatorEntry.TotalStakeAmountNanos.ToBig())
pgValidatorEntry.LastActiveAtEpochNumber = validatorEntry.LastActiveAtEpochNumber
pgValidatorEntry.JailedAtEpochNumber = validatorEntry.JailedAtEpochNumber
keyBytesWithoutPrefix := keyBytes[1:]
pgValidatorEntry.SnapshotAtEpochNumber = lib.DecodeUint64(keyBytesWithoutPrefix[:8])
return pgValidatorEntry
}