Skip to content

Commit 75b4d0a

Browse files
authored
Skip unnecessary payload deserialization, batch attestation inserts, unify validator_indices, and clean up time constants (#174)
## Summary Follow-up cleanup from reviewing the devnet 3 merge (#107). Four targeted optimizations and deduplication fixes — no behavior changes. - Skip payload deserialization in `update_safe_target` (runs every slot) - Batch `insert_attestation_data_by_root` commits in `on_block_core` (runs every block) - Unify duplicated `aggregation_bits_to_validator_indices` into shared `validator_indices()` - Derive `MILLISECONDS_PER_SLOT` from base constants, remove redundant `SECONDS_PER_SLOT` ## Changes in detail ### 1. Key-only iterators for `update_safe_target` (`crates/storage/src/store.rs`, `crates/blockchain/src/store.rs`) `update_safe_target` calls `iter_known_aggregated_payloads()` and `iter_new_aggregated_payloads()`, which deserialize `Vec<StoredAggregatedPayload>` per entry (each containing multi-KB `AggregatedSignatureProof` objects). It then immediately discards the values and passes only the keys to `extract_latest_attestations`. **Fix:** Added `iter_known_aggregated_payload_keys()` and `iter_new_aggregated_payload_keys()` backed by a shared `iter_aggregated_payload_keys(table)` helper that decodes only the key bytes, skipping value deserialization entirely. `update_safe_target` now uses these through a `HashSet` for deduplication. This runs every slot (every 4 seconds) and scales with validator count. ### 2. Batch attestation data inserts in `on_block_core` (`crates/storage/src/store.rs`, `crates/blockchain/src/store.rs`) `on_block_core` called `insert_attestation_data_by_root` once per block body attestation plus once for the proposer attestation. Each call opened a separate write batch and committed — N+1 round-trips per block. **Fix:** Added `insert_attestation_data_by_root_batch` that writes all entries in a single batch-commit. `on_block_core` now collects all attestation data entries (body + proposer) and inserts them in one go. The existing `insert_attestation_data_by_root` is kept for the single-entry callers (`on_gossip_attestation`, `on_gossip_aggregated_attestation`). ### 3. Unify `aggregation_bits_to_validator_indices` (`crates/common/types/src/attestation.rs`, `crates/common/types/src/block.rs`, `crates/blockchain/src/store.rs`) `store.rs` had a free function `aggregation_bits_to_validator_indices(&AggregationBits) -> Vec<u64>` that was structurally identical to `AggregatedSignatureProof::participant_indices()` in `block.rs`. Both iterate a bitfield and collect set-bit indices. **Fix:** Added `validator_indices(&AggregationBits) -> impl Iterator<Item = u64>` in `ethlambda_types::attestation` (where `AggregationBits` is defined). `participant_indices()` now delegates to it. The local function in `store.rs` is removed; all 5 call sites use the shared function. ### 4. Derive `MILLISECONDS_PER_SLOT` and remove `SECONDS_PER_SLOT` (`crates/blockchain/src/lib.rs`, test files) Three independent time constants were defined: ```rust pub const SECONDS_PER_SLOT: u64 = 4; pub const MILLISECONDS_PER_SLOT: u64 = 4_000; pub const MILLISECONDS_PER_INTERVAL: u64 = 800; pub const INTERVALS_PER_SLOT: u64 = 5; ``` `SECONDS_PER_SLOT` was only used in two test files and was redundant. `MILLISECONDS_PER_SLOT` was independently defined rather than derived, creating a consistency risk if either base constant changes. **Fix:** Removed `SECONDS_PER_SLOT`. Made `MILLISECONDS_PER_SLOT = MILLISECONDS_PER_INTERVAL * INTERVALS_PER_SLOT`. Updated test callers (`forkchoice_spectests.rs`, `signature_spectests.rs`) to use `genesis_time * 1000 + slot * MILLISECONDS_PER_SLOT`, matching the production arithmetic in `get_proposal_head`. ## How to test All existing tests cover these changes — no new behavior was introduced. ```bash cargo test --workspace --release cargo clippy --workspace --tests -- -D warnings cargo fmt --all -- --check ``` All 102 tests pass.
1 parent 5f6babd commit 75b4d0a

7 files changed

Lines changed: 83 additions & 45 deletions

File tree

crates/blockchain/src/lib.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,12 @@ pub struct BlockChain {
4141
handle: GenServerHandle<BlockChainServer>,
4242
}
4343

44-
/// Seconds in a slot.
45-
pub const SECONDS_PER_SLOT: u64 = 4;
46-
/// Milliseconds in a slot.
47-
pub const MILLISECONDS_PER_SLOT: u64 = 4_000;
4844
/// Milliseconds per interval (800ms ticks).
4945
pub const MILLISECONDS_PER_INTERVAL: u64 = 800;
5046
/// Number of intervals per slot (5 intervals of 800ms = 4 seconds).
5147
pub const INTERVALS_PER_SLOT: u64 = 5;
48+
/// Milliseconds in a slot (derived from interval duration and count).
49+
pub const MILLISECONDS_PER_SLOT: u64 = MILLISECONDS_PER_INTERVAL * INTERVALS_PER_SLOT;
5250
impl BlockChain {
5351
pub fn spawn(
5452
store: Store,

crates/blockchain/src/store.rs

Lines changed: 26 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use ethlambda_types::{
99
ShortRoot,
1010
attestation::{
1111
AggregatedAttestation, AggregationBits, Attestation, AttestationData,
12-
SignedAggregatedAttestation, SignedAttestation,
12+
SignedAggregatedAttestation, SignedAttestation, validator_indices,
1313
},
1414
block::{
1515
AggregatedAttestations, AggregatedSignatureProof, Block, BlockBody,
@@ -94,15 +94,15 @@ fn update_safe_target(store: &mut Store) {
9494
let min_target_score = (num_validators * 2).div_ceil(3);
9595

9696
let blocks = store.get_live_chain();
97-
// Merge both attestation pools. At interval 3 the migration (interval 4) hasn't
98-
// run yet, so attestations that entered "known" directly (proposer's own attestation
99-
// in block body, node's self-attestation) would be invisible without this merge.
100-
let mut all_payloads: HashMap<SignatureKey, Vec<StoredAggregatedPayload>> =
101-
store.iter_known_aggregated_payloads().collect();
102-
for (key, new_proofs) in store.iter_new_aggregated_payloads() {
103-
all_payloads.entry(key).or_default().extend(new_proofs);
104-
}
105-
let attestations = store.extract_latest_attestations(all_payloads.into_keys());
97+
// Merge both attestation pools (keys only — skip payload deserialization).
98+
// At interval 3 the migration (interval 4) hasn't run yet, so attestations
99+
// that entered "known" directly (proposer's own attestation in block body,
100+
// node's self-attestation) would be invisible without this merge.
101+
let all_keys: HashSet<SignatureKey> = store
102+
.iter_known_aggregated_payload_keys()
103+
.chain(store.iter_new_aggregated_payload_keys())
104+
.collect();
105+
let attestations = store.extract_latest_attestations(all_keys.into_iter());
106106
let (safe_target, _weights) = ethlambda_fork_choice::compute_lmd_ghost_head(
107107
store.latest_justified().root,
108108
&blocks,
@@ -569,15 +569,16 @@ fn on_block_core(
569569

570570
// Process block body attestations.
571571
// Store attestation data by root and proofs in known aggregated payloads.
572+
let mut att_data_entries: Vec<(H256, AttestationData)> = Vec::new();
572573
let mut known_entries: Vec<(SignatureKey, StoredAggregatedPayload)> = Vec::new();
573574
for (att, proof) in aggregated_attestations
574575
.iter()
575576
.zip(attestation_signatures.iter())
576577
{
577578
let data_root = att.data.tree_hash_root();
578-
store.insert_attestation_data_by_root(data_root, att.data.clone());
579+
att_data_entries.push((data_root, att.data.clone()));
579580

580-
let validator_ids = aggregation_bits_to_validator_indices(&att.aggregation_bits);
581+
let validator_ids: Vec<_> = validator_indices(&att.aggregation_bits).collect();
581582
let payload = StoredAggregatedPayload {
582583
slot: att.data.slot,
583584
proof: proof.clone(),
@@ -588,19 +589,22 @@ fn on_block_core(
588589
metrics::inc_attestations_valid("block");
589590
}
590591
}
592+
593+
// Process proposer attestation as pending (enters "new" stage via gossip path)
594+
// The proposer's attestation should NOT affect this block's fork choice position.
595+
let proposer_vid = proposer_attestation.validator_id;
596+
let proposer_data_root = proposer_attestation.data.tree_hash_root();
597+
att_data_entries.push((proposer_data_root, proposer_attestation.data.clone()));
598+
599+
// Batch-insert all attestation data (body + proposer) in a single commit
600+
store.insert_attestation_data_by_root_batch(att_data_entries);
591601
store.insert_known_aggregated_payloads_batch(known_entries);
592602

593603
// Update forkchoice head based on new block and attestations
594604
// IMPORTANT: This must happen BEFORE processing proposer attestation
595605
// to prevent the proposer from gaining circular weight advantage.
596606
update_head(store, false);
597607

598-
// Process proposer attestation as pending (enters "new" stage via gossip path)
599-
// The proposer's attestation should NOT affect this block's fork choice position.
600-
let proposer_vid = proposer_attestation.validator_id;
601-
let proposer_data_root = proposer_attestation.data.tree_hash_root();
602-
store.insert_attestation_data_by_root(proposer_data_root, proposer_attestation.data.clone());
603-
604608
if !verify {
605609
// Without sig verification, insert directly with a dummy proof
606610
let participants = aggregation_bits_from_validator_indices(&[proposer_vid]);
@@ -888,15 +892,7 @@ pub enum StoreError {
888892
NotProposer { validator_index: u64, slot: u64 },
889893
}
890894

891-
/// Extract validator indices from aggregation bits.
892-
fn aggregation_bits_to_validator_indices(bits: &AggregationBits) -> Vec<u64> {
893-
bits.iter()
894-
.enumerate()
895-
.filter_map(|(i, bit)| if bit { Some(i as u64) } else { None })
896-
.collect()
897-
}
898-
899-
/// Extract validator indices from aggregation bits.
895+
/// Build an AggregationBits bitfield from a list of validator indices.
900896
fn aggregation_bits_from_validator_indices(bits: &[u64]) -> AggregationBits {
901897
if bits.is_empty() {
902898
return AggregationBits::with_capacity(0).expect("max capacity is non-zero");
@@ -1085,8 +1081,7 @@ fn select_aggregated_proofs(
10851081
let data = &aggregated.data;
10861082
let message = data.tree_hash_root();
10871083

1088-
let validator_ids = aggregation_bits_to_validator_indices(&aggregated.aggregation_bits);
1089-
let mut remaining: HashSet<u64> = validator_ids.into_iter().collect();
1084+
let mut remaining: HashSet<u64> = validator_indices(&aggregated.aggregation_bits).collect();
10901085

10911086
// Select existing proofs that cover the most remaining validators
10921087
while !remaining.is_empty() {
@@ -1104,8 +1099,7 @@ fn select_aggregated_proofs(
11041099
let (proof, covered) = candidates
11051100
.iter()
11061101
.map(|p| {
1107-
let covered: Vec<_> = aggregation_bits_to_validator_indices(&p.participants)
1108-
.into_iter()
1102+
let covered: Vec<_> = validator_indices(&p.participants)
11091103
.filter(|vid| remaining.contains(vid))
11101104
.collect();
11111105
(p, covered)
@@ -1161,7 +1155,7 @@ fn verify_signatures(
11611155

11621156
// Verify each attestation's signature proof
11631157
for (attestation, aggregated_proof) in attestations.iter().zip(attestation_signatures) {
1164-
let validator_ids = aggregation_bits_to_validator_indices(&attestation.aggregation_bits);
1158+
let validator_ids: Vec<_> = validator_indices(&attestation.aggregation_bits).collect();
11651159
if validator_ids.iter().any(|vid| *vid >= num_validators) {
11661160
return Err(StoreError::InvalidValidatorIndex);
11671161
}

crates/blockchain/tests/forkchoice_spectests.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use std::{
44
sync::Arc,
55
};
66

7-
use ethlambda_blockchain::{SECONDS_PER_SLOT, store};
7+
use ethlambda_blockchain::{MILLISECONDS_PER_SLOT, store};
88
use ethlambda_storage::{Store, backend::InMemoryBackend};
99
use ethlambda_types::{
1010
attestation::{Attestation, AttestationData},
@@ -58,8 +58,8 @@ fn run(path: &Path) -> datatest_stable::Result<()> {
5858

5959
let signed_block = build_signed_block(block_data);
6060

61-
let block_time_ms =
62-
(signed_block.message.block.slot * SECONDS_PER_SLOT + genesis_time) * 1000;
61+
let block_time_ms = genesis_time * 1000
62+
+ signed_block.message.block.slot * MILLISECONDS_PER_SLOT;
6363

6464
// NOTE: the has_proposal argument is set to true, following the spec
6565
store::on_tick(&mut store, block_time_ms, true, false);

crates/blockchain/tests/signature_spectests.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::path::Path;
22
use std::sync::Arc;
33

4-
use ethlambda_blockchain::{SECONDS_PER_SLOT, store};
4+
use ethlambda_blockchain::{MILLISECONDS_PER_SLOT, store};
55
use ethlambda_storage::{Store, backend::InMemoryBackend};
66
use ethlambda_types::{
77
block::{Block, SignedBlockWithAttestation},
@@ -51,7 +51,7 @@ fn run(path: &Path) -> datatest_stable::Result<()> {
5151

5252
// Advance time to the block's slot
5353
let block_time_ms =
54-
(signed_block.message.block.slot * SECONDS_PER_SLOT + genesis_time) * 1000;
54+
genesis_time * 1000 + signed_block.message.block.slot * MILLISECONDS_PER_SLOT;
5555
store::on_tick(&mut st, block_time_ms, true, false);
5656

5757
// Process the block (this includes signature verification)

crates/common/types/src/attestation.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,13 @@ pub struct AggregatedAttestation {
6464
/// in some collective action (attestation, signature aggregation, etc.).
6565
pub type AggregationBits = ssz_types::BitList<ValidatorRegistryLimit>;
6666

67+
/// Returns the indices of set bits in an `AggregationBits` bitfield as validator IDs.
68+
pub fn validator_indices(bits: &AggregationBits) -> impl Iterator<Item = u64> + '_ {
69+
bits.iter()
70+
.enumerate()
71+
.filter_map(|(i, bit)| if bit { Some(i as u64) } else { None })
72+
}
73+
6774
/// Aggregated attestation with its signature proof, used for gossip on the aggregation topic.
6875
#[derive(Debug, Clone, Encode, Decode)]
6976
pub struct SignedAggregatedAttestation {

crates/common/types/src/block.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ use serde::Serialize;
22
use ssz_types::typenum::U1048576;
33

44
use crate::{
5-
attestation::{AggregatedAttestation, AggregationBits, Attestation, XmssSignature},
5+
attestation::{
6+
AggregatedAttestation, AggregationBits, Attestation, XmssSignature, validator_indices,
7+
},
68
primitives::{
79
ByteList, H256,
810
ssz::{Decode, Encode, TreeHash},
@@ -105,9 +107,7 @@ impl AggregatedSignatureProof {
105107

106108
/// Returns the validator indices that are set in the participants bitfield.
107109
pub fn participant_indices(&self) -> impl Iterator<Item = u64> + '_ {
108-
(0..self.participants.len())
109-
.filter(|&i| self.participants.get(i).unwrap_or(false))
110-
.map(|i| i as u64)
110+
validator_indices(&self.participants)
111111
}
112112
}
113113

crates/storage/src/store.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -631,6 +631,22 @@ impl Store {
631631
batch.commit().expect("commit");
632632
}
633633

634+
/// Batch-insert multiple attestation data entries in a single commit.
635+
pub fn insert_attestation_data_by_root_batch(&mut self, entries: Vec<(H256, AttestationData)>) {
636+
if entries.is_empty() {
637+
return;
638+
}
639+
let mut batch = self.backend.begin_write().expect("write batch");
640+
let ssz_entries = entries
641+
.into_iter()
642+
.map(|(root, data)| (root.as_ssz_bytes(), data.as_ssz_bytes()))
643+
.collect();
644+
batch
645+
.put_batch(Table::AttestationDataByRoot, ssz_entries)
646+
.expect("put attestation data batch");
647+
batch.commit().expect("commit");
648+
}
649+
634650
/// Returns attestation data for the given root hash.
635651
pub fn get_attestation_data_by_root(&self, root: &H256) -> Option<AttestationData> {
636652
let view = self.backend.begin_read().expect("read view");
@@ -690,6 +706,12 @@ impl Store {
690706
self.iter_aggregated_payloads(Table::LatestKnownAggregatedPayloads)
691707
}
692708

709+
/// Iterates over keys only from the known aggregated payloads table,
710+
/// skipping value deserialization.
711+
pub fn iter_known_aggregated_payload_keys(&self) -> impl Iterator<Item = SignatureKey> + '_ {
712+
self.iter_aggregated_payload_keys(Table::LatestKnownAggregatedPayloads)
713+
}
714+
693715
/// Insert an aggregated payload into the known (fork-choice-active) table.
694716
pub fn insert_known_aggregated_payload(
695717
&mut self,
@@ -719,6 +741,12 @@ impl Store {
719741
self.iter_aggregated_payloads(Table::LatestNewAggregatedPayloads)
720742
}
721743

744+
/// Iterates over keys only from the new aggregated payloads table,
745+
/// skipping value deserialization.
746+
pub fn iter_new_aggregated_payload_keys(&self) -> impl Iterator<Item = SignatureKey> + '_ {
747+
self.iter_aggregated_payload_keys(Table::LatestNewAggregatedPayloads)
748+
}
749+
722750
/// Insert an aggregated payload into the new (pending) table.
723751
pub fn insert_new_aggregated_payload(
724752
&mut self,
@@ -792,6 +820,17 @@ impl Store {
792820
entries.into_iter()
793821
}
794822

823+
fn iter_aggregated_payload_keys(&self, table: Table) -> impl Iterator<Item = SignatureKey> {
824+
let view = self.backend.begin_read().expect("read view");
825+
let keys: Vec<_> = view
826+
.prefix_iterator(table, &[])
827+
.expect("iterator")
828+
.filter_map(|res| res.ok())
829+
.map(|(k, _)| decode_signature_key(&k))
830+
.collect();
831+
keys.into_iter()
832+
}
833+
795834
fn insert_aggregated_payload(
796835
&mut self,
797836
table: Table,

0 commit comments

Comments
 (0)