Skip to content

Commit 60c44ed

Browse files
authored
Merge pull request #12 from deso-protocol/leader-schedule
Add LeaderScheduleEntry
2 parents d04a6ee + 83c472d commit 60c44ed

File tree

7 files changed

+203
-38
lines changed

7 files changed

+203
-38
lines changed

entries/epoch.go

Lines changed: 19 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,8 @@ type EpochEntry struct {
1414
InitialBlockHeight uint64
1515
InitialView uint64
1616
FinalBlockHeight uint64
17-
CreatedAtBlockTimestampNanoSecs uint64
18-
19-
BadgerKey []byte `pg:",pk,use_zero"`
17+
CreatedAtBlockTimestampNanoSecs int64
18+
SnapshotAtEpochNumber uint64
2019
}
2120

2221
type PGEpochEntry struct {
@@ -33,12 +32,19 @@ type PGEpochUtxoOps struct {
3332

3433
// Convert the EpochEntry DeSo encoder to the PGEpochEntry struct used by bun.
3534
func EpochEntryEncoderToPGStruct(epochEntry *lib.EpochEntry, keyBytes []byte, params *lib.DeSoParams) EpochEntry {
35+
36+
var snapshotAtEpochNumber uint64
37+
// Epochs use data snapshotted from two epochs ago. Epochs 0 and 1 use data from epoch 0.
38+
if epochEntry.EpochNumber >= 2 {
39+
snapshotAtEpochNumber = epochEntry.EpochNumber - 2
40+
}
3641
return EpochEntry{
37-
EpochNumber: epochEntry.EpochNumber,
38-
InitialBlockHeight: epochEntry.InitialBlockHeight,
39-
InitialView: epochEntry.InitialView,
40-
FinalBlockHeight: epochEntry.FinalBlockHeight,
41-
BadgerKey: keyBytes,
42+
EpochNumber: epochEntry.EpochNumber,
43+
InitialBlockHeight: epochEntry.InitialBlockHeight,
44+
InitialView: epochEntry.InitialView,
45+
FinalBlockHeight: epochEntry.FinalBlockHeight,
46+
CreatedAtBlockTimestampNanoSecs: epochEntry.CreatedAtBlockTimestampNanoSecs,
47+
SnapshotAtEpochNumber: snapshotAtEpochNumber,
4248
}
4349
}
4450

@@ -49,8 +55,11 @@ func EpochEntryBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, param
4955
// We also ensure before this that all entries have the same operation type.
5056
operationType := entries[0].OperationType
5157
var err error
58+
// Core only tracks the current epoch entry and never deletes them.
59+
// In order to track all historical epoch entries, we don't use the badger
60+
// key to uniquely identify them, but rather the epoch number.
5261
if operationType == lib.DbOperationTypeDelete {
53-
err = bulkDeleteEpochEntry(entries, db, operationType)
62+
return errors.Wrapf(err, "entries.EpochEntryBatchOperation: Delete operation type not supported")
5463
} else {
5564
err = bulkInsertEpochEntry(entries, db, operationType, params)
5665
}
@@ -76,31 +85,11 @@ func bulkInsertEpochEntry(entries []*lib.StateChangeEntry, db *bun.DB, operation
7685
query := db.NewInsert().Model(&pgEntrySlice)
7786

7887
if operationType == lib.DbOperationTypeUpsert {
79-
query = query.On("CONFLICT (badger_key) DO UPDATE")
88+
query = query.On("CONFLICT (epoch_number) DO UPDATE")
8089
}
8190

8291
if _, err := query.Returning("").Exec(context.Background()); err != nil {
8392
return errors.Wrapf(err, "entries.bulkInsertEpochEntry: Error inserting entries")
8493
}
8594
return nil
8695
}
87-
88-
// bulkDeleteEpochEntry deletes a batch of locked stake entries from the database.
89-
func bulkDeleteEpochEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error {
90-
// Track the unique entries we've inserted so we don't insert the same entry twice.
91-
uniqueEntries := consumer.UniqueEntries(entries)
92-
93-
// Transform the entries into a list of keys to delete.
94-
keysToDelete := consumer.KeysToDelete(uniqueEntries)
95-
96-
// Execute the delete query.
97-
if _, err := db.NewDelete().
98-
Model(&PGEpochEntry{}).
99-
Where("badger_key IN (?)", bun.In(keysToDelete)).
100-
Returning("").
101-
Exec(context.Background()); err != nil {
102-
return errors.Wrapf(err, "entries.bulkDeleteEpochEntry: Error deleting entries")
103-
}
104-
105-
return nil
106-
}

entries/pkid.go

Lines changed: 106 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"github.com/deso-protocol/core/lib"
66
"github.com/deso-protocol/state-consumer/consumer"
7+
"github.com/golang/glog"
78
"github.com/pkg/errors"
89
"github.com/uptrace/bun"
910
)
@@ -25,18 +26,48 @@ type PGPkidEntryUtxoOps struct {
2526
UtxoOperation
2627
}
2728

29+
type LeaderScheduleEntry struct {
30+
SnapshotAtEpochNumber uint64 `pg:",use_zero"`
31+
LeaderIndex uint16 `pg:",use_zero"`
32+
ValidatorPKID string `pg:",use_zero"`
33+
BadgerKey []byte `pg:",pk,use_zero"`
34+
}
35+
36+
type PGLeaderScheduleEntry struct {
37+
bun.BaseModel `bun:"table:leader_schedule_entry"`
38+
LeaderScheduleEntry
39+
}
40+
2841
// Convert the Diamond DeSo encoder to the PG struct used by bun.
29-
func PkidEncoderToPGStruct(pkidEntry *lib.PKIDEntry, keyBytes []byte, params *lib.DeSoParams) PkidEntry {
42+
func PkidEntryEncoderToPGStruct(pkidEntry *lib.PKIDEntry, keyBytes []byte, params *lib.DeSoParams) PkidEntry {
3043
return PkidEntry{
3144
Pkid: consumer.PublicKeyBytesToBase58Check(pkidEntry.PKID[:], params),
3245
PublicKey: consumer.PublicKeyBytesToBase58Check(pkidEntry.PublicKey[:], params),
3346
BadgerKey: keyBytes,
3447
}
3548
}
3649

50+
// Convert the leader schedule entry to the PG struct used by bun.
51+
func LeaderScheduleEncoderToPGStruct(validatorPKID *lib.PKID, keyBytes []byte, params *lib.DeSoParams,
52+
) *LeaderScheduleEntry {
53+
prefixRemovedKeyBytes := keyBytes[1:]
54+
if len(prefixRemovedKeyBytes) != 10 {
55+
glog.Errorf("LeaderScheduleEncoderToPGStruct: Invalid key length: %d", len(prefixRemovedKeyBytes))
56+
return nil
57+
}
58+
epochNumber := lib.DecodeUint64(prefixRemovedKeyBytes[:8])
59+
leaderIndex := lib.DecodeUint16(prefixRemovedKeyBytes[8:10])
60+
return &LeaderScheduleEntry{
61+
ValidatorPKID: consumer.PublicKeyBytesToBase58Check(validatorPKID[:], params),
62+
SnapshotAtEpochNumber: epochNumber,
63+
LeaderIndex: leaderIndex,
64+
BadgerKey: keyBytes,
65+
}
66+
}
67+
3768
// PostBatchOperation is the entry point for processing a batch of post entries. It determines the appropriate handler
3869
// based on the operation type and executes it.
39-
func PkidBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error {
70+
func PkidEntryBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error {
4071
// We check before we call this function that there is at least one operation type.
4172
// We also ensure before this that all entries have the same operation type.
4273
operationType := entries[0].OperationType
@@ -61,7 +92,7 @@ func bulkInsertPkidEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationT
6192

6293
// Loop through the entries and convert them to PGPostEntry.
6394
for ii, entry := range uniqueEntries {
64-
pgEntrySlice[ii] = &PGPkidEntry{PkidEntry: PkidEncoderToPGStruct(entry.Encoder.(*lib.PKIDEntry), entry.KeyBytes, params)}
95+
pgEntrySlice[ii] = &PGPkidEntry{PkidEntry: PkidEntryEncoderToPGStruct(entry.Encoder.(*lib.PKIDEntry), entry.KeyBytes, params)}
6596
}
6697

6798
query := db.NewInsert().Model(&pgEntrySlice)
@@ -95,3 +126,75 @@ func bulkDeletePkidEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationT
95126

96127
return nil
97128
}
129+
130+
func PkidBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error {
131+
// We check before we call this function that there is at least one operation type.
132+
// We also ensure before this that all entries have the same operation type.
133+
operationType := entries[0].OperationType
134+
var err error
135+
if operationType == lib.DbOperationTypeDelete {
136+
err = bulkDeletePkid(entries, db, operationType)
137+
} else {
138+
err = bulkInsertPkid(entries, db, operationType, params)
139+
}
140+
if err != nil {
141+
return errors.Wrapf(err, "entries.PostBatchOperation: Problem with operation type %v", operationType)
142+
}
143+
return nil
144+
}
145+
146+
// bulkInsertPkid inserts a batch of PKIDs into the database.
147+
func bulkInsertPkid(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error {
148+
// Track the unique entries we've inserted so we don't insert the same entry twice.
149+
uniqueEntries := consumer.UniqueEntries(entries)
150+
151+
uniqueLeaderScheduleEntries := consumer.FilterEntriesByPrefix(
152+
uniqueEntries, lib.Prefixes.PrefixSnapshotLeaderSchedule)
153+
// NOTE: if we need to support parsing other indexes for PKIDs beyond LeaderSchedule,
154+
// we will need to filter the uniqueEntries by the appropriate prefix and then convert
155+
// the entries to the appropriate PG struct.
156+
// Create a new array to hold the bun struct.
157+
pgEntrySlice := make([]*PGLeaderScheduleEntry, len(uniqueLeaderScheduleEntries))
158+
159+
// Loop through the entries and convert them to PGPostEntry.
160+
for ii, entry := range uniqueLeaderScheduleEntries {
161+
leaderScheduleEntry := LeaderScheduleEncoderToPGStruct(entry.Encoder.(*lib.PKID), entry.KeyBytes, params)
162+
if leaderScheduleEntry == nil {
163+
glog.Errorf("bulkInsertPkid: Error converting LeaderScheduleEntry to PG struct")
164+
continue
165+
}
166+
pgEntrySlice[ii] = &PGLeaderScheduleEntry{LeaderScheduleEntry: *leaderScheduleEntry}
167+
}
168+
169+
query := db.NewInsert().Model(&pgEntrySlice)
170+
171+
if operationType == lib.DbOperationTypeUpsert {
172+
query = query.On("CONFLICT (badger_key) DO UPDATE")
173+
}
174+
175+
if _, err := query.Returning("").Exec(context.Background()); err != nil {
176+
return errors.Wrapf(err, "entries.bulkInsertPkid: Error inserting entries")
177+
}
178+
return nil
179+
}
180+
181+
// bulkDeletePKID deletes a batch of PKIDs from the database.
182+
func bulkDeletePkid(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error {
183+
// Track the unique entries we've inserted so we don't insert the same entry twice.
184+
uniqueEntries := consumer.UniqueEntries(entries)
185+
186+
// Transform the entries into a list of keys to delete.
187+
keysToDelete := consumer.KeysToDelete(uniqueEntries)
188+
leaderSchedKeysToDelete := consumer.FilterKeysByPrefix(keysToDelete, lib.Prefixes.PrefixSnapshotLeaderSchedule)
189+
190+
// Execute the delete query.
191+
if _, err := db.NewDelete().
192+
Model(&LeaderScheduleEntry{}).
193+
Where("badger_key IN (?)", bun.In(leaderSchedKeysToDelete)).
194+
Returning("").
195+
Exec(context.Background()); err != nil {
196+
return errors.Wrapf(err, "entries.bulkDeletePkid: Error deleting entries")
197+
}
198+
199+
return nil
200+
}

handler/data_handler.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func (postgresDataHandler *PostgresDataHandler) HandleEntryBatch(batchedEntries
6363
case lib.EncoderTypePostAssociationEntry:
6464
err = entries.PostAssociationBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params)
6565
case lib.EncoderTypePKIDEntry:
66-
err = entries.PkidBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params)
66+
err = entries.PkidEntryBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params)
6767
case lib.EncoderTypeDeSoBalanceEntry:
6868
err = entries.DesoBalanceBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params)
6969
case lib.EncoderTypeDAOCoinLimitOrderEntry:
@@ -86,6 +86,8 @@ func (postgresDataHandler *PostgresDataHandler) HandleEntryBatch(batchedEntries
8686
err = entries.LockupYieldCurvePointBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params)
8787
case lib.EncoderTypeEpochEntry:
8888
err = entries.EpochEntryBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params)
89+
case lib.EncoderTypePKID:
90+
err = entries.PkidBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params)
8991
}
9092

9193
if err != nil {

migrations/initial_migrations/20240129000003_create_epoch_entry_table.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,18 @@ import (
1212
func createEpochEntryTable(db *bun.DB, tableName string) error {
1313
_, err := db.Exec(strings.Replace(`
1414
CREATE TABLE {tableName} (
15-
epoch_number BIGINT NOT NULL,
15+
epoch_number BIGINT PRIMARY KEY NOT NULL,
1616
initial_block_height BIGINT NOT NULL,
1717
initial_view BIGINT NOT NULL,
1818
final_block_height BIGINT NOT NULL,
1919
created_at_block_timestamp_nano_secs BIGINT NOT NULL,
20-
21-
badger_key BYTEA PRIMARY KEY
20+
snapshot_at_epoch_number BIGINT NOT NULL
2221
);
22+
23+
CREATE INDEX {tableName}_epoch_number_idx ON {tableName} (epoch_number);
24+
CREATE INDEX {tableName}_initial_block_height_idx ON {tableName} (initial_block_height);
25+
CREATE INDEX {tableName}_final_block_height_idx ON {tableName} (final_block_height);
26+
CREATE INDEX {tableName}_snapshot_at_epoch_number_idx ON {tableName} (snapshot_at_epoch_number);
2327
`, "{tableName}", tableName, -1))
2428
// TODO: What other fields do we need indexed?
2529
return err
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package initial_migrations
2+
3+
import (
4+
"context"
5+
"strings"
6+
7+
"github.com/uptrace/bun"
8+
)
9+
10+
func createLeaderScheduleTable(db *bun.DB, tableName string) error {
11+
_, err := db.Exec(strings.Replace(`
12+
CREATE TABLE {tableName} (
13+
validator_pkid VARCHAR NOT NULL,
14+
snapshot_at_epoch_number BIGINT NOT NULL,
15+
leader_index INTEGER NOT NULL,
16+
badger_key BYTEA PRIMARY KEY NOT NULL
17+
);
18+
CREATE INDEX {tableName}_validator_pkid_idx ON {tableName} (validator_pkid);
19+
CREATE INDEX {tableName}_snapshot_at_epoch_number_idx ON {tableName} (snapshot_at_epoch_number);
20+
CREATE INDEX {tableName}_snapshot_at_epoch_number_leader_index_idx ON {tableName} (snapshot_at_epoch_number, leader_index);
21+
`, "{tableName}", tableName, -1))
22+
return err
23+
}
24+
25+
func init() {
26+
Migrations.MustRegister(func(ctx context.Context, db *bun.DB) error {
27+
return createLeaderScheduleTable(db, "leader_schedule_entry")
28+
}, func(ctx context.Context, db *bun.DB) error {
29+
_, err := db.Exec(`
30+
DROP TABLE IF EXISTS leader_schedule_entry;
31+
`)
32+
if err != nil {
33+
return err
34+
}
35+
return nil
36+
})
37+
}

migrations/post_sync_migrations/20240213000002_create_pos_fk_comments.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ func init() {
2222
comment on column locked_stake_entry.badger_key is E'@omit';
2323
comment on column yield_curve_point.badger_key is E'@omit';
2424
comment on column locked_balance_entry.badger_key is E'@omit';
25-
comment on column epoch_entry.badger_key is E'@omit';
2625
comment on table transaction_partition_34 is E'@omit';
2726
comment on table transaction_partition_35 is E'@omit';
2827
comment on table transaction_partition_36 is E'@omit';
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package post_sync_migrations
2+
3+
import (
4+
"context"
5+
6+
"github.com/uptrace/bun"
7+
)
8+
9+
func init() {
10+
Migrations.MustRegister(func(ctx context.Context, db *bun.DB) error {
11+
_, err := db.Exec(`
12+
comment on table leader_schedule_entry is E'@foreignKey (validator_pkid) references account (pkid)|@foreignFieldName leaderScheduleEntries|@fieldName leaderAccount\n@foreignKey (validator_pkid) references validator_entry (validator_pkid)|@foreignFieldName leaderScheduleEntries|@fieldName validatorEntry\n@foreignKey (snapshot_at_epoch_number) references epoch_entry (snapshot_at_epoch_number)|@foreignFieldName leaderScheduleEntries|@fieldName epochEntryBySnapshot';
13+
comment on column leader_schedule_entry.badger_key is E'@omit';
14+
`)
15+
if err != nil {
16+
return err
17+
}
18+
19+
return nil
20+
}, func(ctx context.Context, db *bun.DB) error {
21+
_, err := db.Exec(`
22+
comment on table leader_schedule_entry is NULL;
23+
comment on column leader_schedule_entry.badger_key is NULL;
24+
`)
25+
if err != nil {
26+
return err
27+
}
28+
29+
return nil
30+
})
31+
}

0 commit comments

Comments
 (0)