Skip to content

Commit bbb9165

Browse files
Port subnet network topology from leanSpec PR #482 (#249)
## Motivation leanSpec PR #482 (merged 2025-03-25) introduces `--aggregate-subnet-ids` and changes how nodes subscribe to attestation subnets. The key architectural shift: **subnet filtering moves from the fork choice store to the P2P subscription layer**. Before this PR, ethlambda had several gaps: - Only the first validator's subnet was used (multi-validator nodes all published/subscribed to one subnet) - The store had a **hardcoded** `ATTESTATION_COMMITTEE_COUNT = 1` constant (ignored the CLI parameter) - The store performed per-attestation subnet filtering that the spec removed - No `--aggregate-subnet-ids` CLI flag existed - Non-aggregator validators didn't subscribe to their subnet (should, for gossipsub mesh health) ## Description ### Phase 1: CLI (`bin/ethlambda/src/main.rs`) - Add `--aggregate-subnet-ids` CLI flag (comma-separated, `requires = "is_aggregator"` via clap) - Collect **all** validator IDs instead of just the first one - Pass `validator_ids: Vec<u64>` and `aggregate_subnet_ids` to `SwarmConfig` ### Phase 2: P2P layer (`crates/net/p2p/src/lib.rs`, `crates/net/p2p/src/gossipsub/handler.rs`) - `SwarmConfig`: `validator_id: Option<u64>` → `validator_ids: Vec<u64>` + `aggregate_subnet_ids: Option<Vec<u64>>` - `BuiltSwarm` / `P2PServer`: single `attestation_topic` → `attestation_topics: HashMap<u64, IdentTopic>` + `attestation_committee_count: u64` - **Multi-subnet subscription logic**: - All nodes with validators subscribe to their validator subnets (for gossipsub mesh health) - Aggregators additionally subscribe to any explicitly requested `--aggregate-subnet-ids` - Aggregator with no validators and no explicit subnets: fallback to subnet 0 - Non-validator non-aggregator nodes: no attestation subscriptions - `publish_attestation`: routes per-validator to the correct subnet topic (`validator_id % committee_count`); if the subnet isn't subscribed, constructs the topic on-the-fly for gossipsub fanout - Metric `lean_attestation_committee_subnet` reports the lowest validator subnet (backward-compatible) ### Phase 3: Store simplification (`crates/blockchain/src/store.rs`) - **Removed** `ATTESTATION_COMMITTEE_COUNT` constant and `compute_subnet_id()` helper - `on_gossip_attestation`: removed `local_validator_ids` parameter; stores gossip signatures **unconditionally** (subnet filtering already handled at P2P layer) - `on_block` / `on_block_core`: removed `local_validator_ids` parameter; stores proposer signature **unconditionally** ### Phase 4: Caller updates (`crates/blockchain/src/lib.rs`, `crates/blockchain/tests/signature_spectests.rs`) - Updated all call sites to match simplified store function signatures ## How to test ```bash make fmt # ✅ passes make lint # ✅ passes make test # ✅ all 97 tests pass (forkchoice + signature + STF spec tests) ``` For devnet testing: ```bash # Single subnet (existing behavior, unchanged) --attestation-committee-count 1 --is-aggregator # Multi-subnet with explicit aggregator subscription --attestation-committee-count 2 --is-aggregator --aggregate-subnet-ids 0,1 # Multi-validator node (subscribes to all validator subnets automatically) --attestation-committee-count 4 --is-aggregator ``` Verify via logs: - Look for `Subscribed to attestation subnet` lines (one per subscribed subnet) - `Published attestation to gossipsub` now includes `subnet_id` field - Gossip signatures are stored without subnet filtering (no more silent drops) ## Notes - **Non-aggregator subscription is new behavior**: previously non-aggregators never subscribed to attestation subnets. Now they subscribe for mesh health. This slightly increases bandwidth but matches the spec. - **Hardcoded `"devnet0"` network name**: existing limitation carried forward in the fanout topic construction. Should be addressed separately. - **Subnet metric**: `lean_attestation_committee_subnet` is a single gauge. With multiple subnets, we report the lowest validator's subnet for backward compatibility. Can be improved in a follow-up. ## Related - leanSpec PR #482 (upstream spec change) --------- Co-authored-by: Tomás Grüner <47506558+MegaRedHand@users.noreply.github.com>
1 parent 7cc7851 commit bbb9165

8 files changed

Lines changed: 95 additions & 82 deletions

File tree

bin/ethlambda/src/main.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,10 @@ struct CliOptions {
6969
/// Number of attestation committees (subnets) per slot
7070
#[arg(long, default_value = "1", value_parser = clap::value_parser!(u64).range(1..))]
7171
attestation_committee_count: u64,
72+
/// Subnet IDs this aggregator should subscribe to (comma-separated).
73+
/// Requires --is-aggregator. Defaults to the subnets of the node's validators.
74+
#[arg(long, value_delimiter = ',', requires = "is_aggregator")]
75+
aggregate_subnet_ids: Option<Vec<u64>>,
7276
/// Directory for RocksDB storage
7377
#[arg(long, default_value = "./data")]
7478
data_dir: PathBuf,
@@ -146,17 +150,17 @@ async fn main() -> eyre::Result<()> {
146150
.await
147151
.inspect_err(|err| error!(%err, "Failed to initialize state"))?;
148152

149-
// Use first validator ID for subnet subscription
150-
let first_validator_id = validator_keys.keys().min().copied();
153+
let validator_ids: Vec<u64> = validator_keys.keys().copied().collect();
151154
let blockchain = BlockChain::spawn(store.clone(), validator_keys, options.is_aggregator);
152155

153156
let built = build_swarm(SwarmConfig {
154157
node_key: node_p2p_key,
155158
bootnodes,
156159
listening_socket: p2p_socket,
157-
validator_id: first_validator_id,
160+
validator_ids,
158161
attestation_committee_count: options.attestation_committee_count,
159162
is_aggregator: options.is_aggregator,
163+
aggregate_subnet_ids: options.aggregate_subnet_ids,
160164
})
161165
.expect("failed to build swarm");
162166

crates/blockchain/src/lib.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -279,8 +279,7 @@ impl BlockChainServer {
279279
&mut self,
280280
signed_block: SignedBlockWithAttestation,
281281
) -> Result<(), StoreError> {
282-
let validator_ids = self.key_manager.validator_ids();
283-
store::on_block(&mut self.store, signed_block, &validator_ids)?;
282+
store::on_block(&mut self.store, signed_block)?;
284283
metrics::update_head_slot(self.store.head_slot());
285284
metrics::update_latest_justified_slot(self.store.latest_justified().slot);
286285
metrics::update_latest_finalized_slot(self.store.latest_finalized().slot);
@@ -454,8 +453,7 @@ impl BlockChainServer {
454453
warn!("Received unaggregated attestation but node is not an aggregator");
455454
return;
456455
}
457-
let validator_ids = self.key_manager.validator_ids();
458-
let _ = store::on_gossip_attestation(&mut self.store, attestation, &validator_ids)
456+
let _ = store::on_gossip_attestation(&mut self.store, attestation)
459457
.inspect_err(|err| warn!(%err, "Failed to process gossiped attestation"));
460458
}
461459

crates/blockchain/src/store.rs

Lines changed: 17 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,6 @@ use crate::{INTERVALS_PER_SLOT, MILLISECONDS_PER_INTERVAL, MILLISECONDS_PER_SLOT
2626

2727
const JUSTIFICATION_LOOKBACK_SLOTS: u64 = 3;
2828

29-
/// Number of attestation committees per slot.
30-
/// With ATTESTATION_COMMITTEE_COUNT = 1, all validators are in subnet 0.
31-
const ATTESTATION_COMMITTEE_COUNT: u64 = 1;
32-
33-
/// Compute the attestation subnet ID for a validator.
34-
#[allow(clippy::modulo_one)]
35-
fn compute_subnet_id(validator_id: u64) -> u64 {
36-
validator_id % ATTESTATION_COMMITTEE_COUNT
37-
}
38-
3929
/// Accept new aggregated payloads, promoting them to known for fork choice.
4030
fn accept_new_attestations(store: &mut Store, log_tree: bool) {
4131
store.promote_new_aggregated_payloads();
@@ -367,7 +357,6 @@ pub fn on_tick(
367357
pub fn on_gossip_attestation(
368358
store: &mut Store,
369359
signed_attestation: SignedAttestation,
370-
local_validator_ids: &[u64],
371360
) -> Result<(), StoreError> {
372361
let validator_id = signed_attestation.validator_id;
373362
let attestation = Attestation {
@@ -407,16 +396,10 @@ pub fn on_gossip_attestation(
407396
// Store attestation data by root (content-addressed, idempotent)
408397
store.insert_attestation_data_by_root(data_root, attestation.data.clone());
409398

410-
// Store gossip signature for later aggregation at interval 2,
411-
// only if the attester is in the same subnet as one of our validators.
412-
let attester_subnet = compute_subnet_id(validator_id);
413-
let in_our_subnet = local_validator_ids
414-
.iter()
415-
.any(|&vid| compute_subnet_id(vid) == attester_subnet);
416-
if in_our_subnet {
417-
store.insert_gossip_signature(data_root, attestation.data.slot, validator_id, signature);
418-
metrics::update_gossip_signatures(store.gossip_signatures_count());
419-
}
399+
// Store gossip signature unconditionally for later aggregation at interval 2.
400+
// Subnet filtering is handled at the P2P subscription layer.
401+
store.insert_gossip_signature(data_root, attestation.data.slot, validator_id, signature);
402+
metrics::update_gossip_signatures(store.gossip_signatures_count());
420403

421404
metrics::inc_attestations_valid();
422405

@@ -524,9 +507,8 @@ pub fn on_gossip_aggregated_attestation(
524507
pub fn on_block(
525508
store: &mut Store,
526509
signed_block: SignedBlockWithAttestation,
527-
local_validator_ids: &[u64],
528510
) -> Result<(), StoreError> {
529-
on_block_core(store, signed_block, true, local_validator_ids)
511+
on_block_core(store, signed_block, true)
530512
}
531513

532514
/// Process a new block without signature verification.
@@ -537,7 +519,7 @@ pub fn on_block_without_verification(
537519
store: &mut Store,
538520
signed_block: SignedBlockWithAttestation,
539521
) -> Result<(), StoreError> {
540-
on_block_core(store, signed_block, false, &[])
522+
on_block_core(store, signed_block, false)
541523
}
542524

543525
/// Core block processing logic.
@@ -548,7 +530,6 @@ fn on_block_core(
548530
store: &mut Store,
549531
signed_block: SignedBlockWithAttestation,
550532
verify: bool,
551-
local_validator_ids: &[u64],
552533
) -> Result<(), StoreError> {
553534
let _timing = metrics::time_fork_choice_block_processing();
554535

@@ -653,23 +634,17 @@ fn on_block_core(
653634
};
654635
store.insert_new_aggregated_payload((proposer_vid, proposer_data_root), payload);
655636
} else {
656-
// Store the proposer's signature for potential future block building,
657-
// only if the proposer is in the same subnet as one of our validators.
658-
let proposer_subnet = compute_subnet_id(proposer_vid);
659-
let in_our_subnet = local_validator_ids
660-
.iter()
661-
.any(|&vid| compute_subnet_id(vid) == proposer_subnet);
662-
if in_our_subnet {
663-
let proposer_sig =
664-
ValidatorSignature::from_bytes(&signed_block.signature.proposer_signature)
665-
.map_err(|_| StoreError::SignatureDecodingFailed)?;
666-
store.insert_gossip_signature(
667-
proposer_data_root,
668-
proposer_attestation.data.slot,
669-
proposer_vid,
670-
proposer_sig,
671-
);
672-
}
637+
// Store the proposer's signature unconditionally for future block building.
638+
// Subnet filtering is handled at the P2P subscription layer.
639+
let proposer_sig =
640+
ValidatorSignature::from_bytes(&signed_block.signature.proposer_signature)
641+
.map_err(|_| StoreError::SignatureDecodingFailed)?;
642+
store.insert_gossip_signature(
643+
proposer_data_root,
644+
proposer_attestation.data.slot,
645+
proposer_vid,
646+
proposer_sig,
647+
);
673648
}
674649

675650
info!(%slot, %block_root, %state_root, "Processed new block");

crates/blockchain/tests/signature_spectests.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ fn run(path: &Path) -> datatest_stable::Result<()> {
5555
store::on_tick(&mut st, block_time_ms, true, false);
5656

5757
// Process the block (this includes signature verification)
58-
let result = store::on_block(&mut st, signed_block, &[]);
58+
let result = store::on_block(&mut st, signed_block);
5959

6060
// Step 3: Check that it succeeded or failed as expected
6161
match (result.is_ok(), test.expect_exception.as_ref()) {

crates/net/p2p/src/gossipsub/handler.rs

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,10 @@ use tracing::{error, info, trace};
99

1010
use super::{
1111
encoding::{compress_message, decompress_message},
12-
messages::{AGGREGATION_TOPIC_KIND, ATTESTATION_SUBNET_TOPIC_PREFIX, BLOCK_TOPIC_KIND},
12+
messages::{
13+
AGGREGATION_TOPIC_KIND, ATTESTATION_SUBNET_TOPIC_PREFIX, BLOCK_TOPIC_KIND,
14+
attestation_subnet_topic,
15+
},
1316
};
1417
use crate::P2PServer;
1518

@@ -123,20 +126,27 @@ pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) {
123126
pub async fn publish_attestation(server: &mut P2PServer, attestation: SignedAttestation) {
124127
let slot = attestation.data.slot;
125128
let validator = attestation.validator_id;
129+
let subnet_id = validator % server.attestation_committee_count;
126130

127131
// Encode to SSZ
128132
let ssz_bytes = attestation.as_ssz_bytes();
129133

130134
// Compress with raw snappy
131135
let compressed = compress_message(&ssz_bytes);
132136

137+
// Look up subscribed topic or construct on-the-fly for gossipsub fanout
138+
let topic = server
139+
.attestation_topics
140+
.get(&subnet_id)
141+
.cloned()
142+
.unwrap_or_else(|| attestation_subnet_topic(subnet_id));
143+
133144
// Publish to the attestation subnet topic
134-
server
135-
.swarm_handle
136-
.publish(server.attestation_topic.clone(), compressed);
145+
server.swarm_handle.publish(topic, compressed);
137146
info!(
138147
%slot,
139148
validator,
149+
subnet_id,
140150
target_slot = attestation.data.target.slot,
141151
target_root = %ShortRoot(&attestation.data.target.root.0),
142152
source_slot = attestation.data.source.slot,

crates/net/p2p/src/gossipsub/messages.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,13 @@ pub const ATTESTATION_SUBNET_TOPIC_PREFIX: &str = "attestation";
88
///
99
/// Full topic format: `/leanconsensus/{network}/aggregation/ssz_snappy`
1010
pub const AGGREGATION_TOPIC_KIND: &str = "aggregation";
11+
12+
// TODO: make this configurable (e.g., via GenesisConfig or CLI)
13+
pub const NETWORK_NAME: &str = "devnet0";
14+
15+
/// Build an attestation subnet topic for the given subnet ID.
16+
pub fn attestation_subnet_topic(subnet_id: u64) -> libp2p::gossipsub::IdentTopic {
17+
libp2p::gossipsub::IdentTopic::new(format!(
18+
"/leanconsensus/{NETWORK_NAME}/{ATTESTATION_SUBNET_TOPIC_PREFIX}_{subnet_id}/ssz_snappy"
19+
))
20+
}

crates/net/p2p/src/gossipsub/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,6 @@ pub use encoding::decompress_message;
66
pub use handler::{
77
handle_gossipsub_message, publish_aggregated_attestation, publish_attestation, publish_block,
88
};
9-
pub use messages::{AGGREGATION_TOPIC_KIND, ATTESTATION_SUBNET_TOPIC_PREFIX, BLOCK_TOPIC_KIND};
9+
pub use messages::{
10+
AGGREGATION_TOPIC_KIND, BLOCK_TOPIC_KIND, NETWORK_NAME, attestation_subnet_topic,
11+
};

crates/net/p2p/src/lib.rs

Lines changed: 41 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ use tracing::{info, trace, warn};
3636

3737
use crate::{
3838
gossipsub::{
39-
AGGREGATION_TOPIC_KIND, ATTESTATION_SUBNET_TOPIC_PREFIX, BLOCK_TOPIC_KIND,
39+
AGGREGATION_TOPIC_KIND, BLOCK_TOPIC_KIND, NETWORK_NAME, attestation_subnet_topic,
4040
publish_aggregated_attestation, publish_attestation, publish_block,
4141
},
4242
req_resp::{
@@ -78,15 +78,17 @@ pub struct SwarmConfig {
7878
pub node_key: Vec<u8>,
7979
pub bootnodes: Vec<Bootnode>,
8080
pub listening_socket: SocketAddr,
81-
pub validator_id: Option<u64>,
81+
pub validator_ids: Vec<u64>,
8282
pub attestation_committee_count: u64,
8383
pub is_aggregator: bool,
84+
pub aggregate_subnet_ids: Option<Vec<u64>>,
8485
}
8586

8687
/// Result of building the swarm — contains all pieces needed to start the P2P actor.
8788
pub struct BuiltSwarm {
8889
pub(crate) swarm: libp2p::Swarm<Behaviour>,
89-
pub(crate) attestation_topic: libp2p::gossipsub::IdentTopic,
90+
pub(crate) attestation_topics: HashMap<u64, libp2p::gossipsub::IdentTopic>,
91+
pub(crate) attestation_committee_count: u64,
9092
pub(crate) block_topic: libp2p::gossipsub::IdentTopic,
9193
pub(crate) aggregation_topic: libp2p::gossipsub::IdentTopic,
9294
pub(crate) bootnode_addrs: HashMap<PeerId, Multiaddr>,
@@ -184,10 +186,8 @@ pub fn build_swarm(
184186
.listen_on(addr)
185187
.expect("failed to bind gossipsub listening address");
186188

187-
let network = "devnet0";
188-
189189
// Subscribe to block topic (all nodes)
190-
let block_topic_str = format!("/leanconsensus/{network}/{BLOCK_TOPIC_KIND}/ssz_snappy");
190+
let block_topic_str = format!("/leanconsensus/{NETWORK_NAME}/{BLOCK_TOPIC_KIND}/ssz_snappy");
191191
let block_topic = libp2p::gossipsub::IdentTopic::new(block_topic_str);
192192
swarm
193193
.behaviour_mut()
@@ -197,42 +197,54 @@ pub fn build_swarm(
197197

198198
// Subscribe to aggregation topic (all validators)
199199
let aggregation_topic_str =
200-
format!("/leanconsensus/{network}/{AGGREGATION_TOPIC_KIND}/ssz_snappy");
200+
format!("/leanconsensus/{NETWORK_NAME}/{AGGREGATION_TOPIC_KIND}/ssz_snappy");
201201
let aggregation_topic = libp2p::gossipsub::IdentTopic::new(aggregation_topic_str);
202202
swarm
203203
.behaviour_mut()
204204
.gossipsub
205205
.subscribe(&aggregation_topic)
206206
.unwrap();
207207

208-
// Build attestation subnet topic (needed for publishing even without subscribing)
209-
// attestation_committee_count is validated to be >= 1 by clap at CLI parse time.
210-
let subnet_id = config
211-
.validator_id
208+
// Compute the set of subnets to subscribe to.
209+
// Validators subscribe for gossipsub mesh health; aggregators additionally
210+
// subscribe to any explicitly requested subnets.
211+
let validator_subnets: HashSet<u64> = config
212+
.validator_ids
213+
.iter()
212214
.map(|vid| vid % config.attestation_committee_count)
213-
.unwrap_or(0);
214-
metrics::set_attestation_committee_subnet(subnet_id);
215+
.collect();
215216

216-
let attestation_topic_kind = format!("{ATTESTATION_SUBNET_TOPIC_PREFIX}_{subnet_id}");
217-
let attestation_topic_str =
218-
format!("/leanconsensus/{network}/{attestation_topic_kind}/ssz_snappy");
219-
let attestation_topic = libp2p::gossipsub::IdentTopic::new(attestation_topic_str);
217+
let mut subscribe_subnets: HashSet<u64> = validator_subnets.clone();
220218

221-
// Only aggregators subscribe to attestation subnets; non-aggregators
222-
// publish via gossipsub's fanout mechanism without subscribing.
223219
if config.is_aggregator {
224-
swarm
225-
.behaviour_mut()
226-
.gossipsub
227-
.subscribe(&attestation_topic)?;
228-
info!(%attestation_topic_kind, "Subscribed to attestation subnet");
220+
if let Some(ref explicit_ids) = config.aggregate_subnet_ids {
221+
subscribe_subnets.extend(explicit_ids);
222+
}
223+
// Aggregator with no validators and no explicit subnets: fallback to subnet 0
224+
if subscribe_subnets.is_empty() {
225+
subscribe_subnets.insert(0);
226+
}
227+
}
228+
229+
// Report lowest validator subnet for backward-compatible metric
230+
let metric_subnet = validator_subnets.iter().copied().min().unwrap_or(0);
231+
metrics::set_attestation_committee_subnet(metric_subnet);
232+
233+
// Build topics and subscribe
234+
let mut attestation_topics: HashMap<u64, libp2p::gossipsub::IdentTopic> = HashMap::new();
235+
for &subnet_id in &subscribe_subnets {
236+
let topic = attestation_subnet_topic(subnet_id);
237+
swarm.behaviour_mut().gossipsub.subscribe(&topic)?;
238+
info!(subnet_id, "Subscribed to attestation subnet");
239+
attestation_topics.insert(subnet_id, topic);
229240
}
230241

231242
info!(socket=%config.listening_socket, "P2P node started");
232243

233244
Ok(BuiltSwarm {
234245
swarm,
235-
attestation_topic,
246+
attestation_topics,
247+
attestation_committee_count: config.attestation_committee_count,
236248
block_topic,
237249
aggregation_topic,
238250
bootnode_addrs,
@@ -255,7 +267,8 @@ impl P2P {
255267
swarm_handle,
256268
store,
257269
blockchain: None,
258-
attestation_topic: built.attestation_topic,
270+
attestation_topics: built.attestation_topics,
271+
attestation_committee_count: built.attestation_committee_count,
259272
block_topic: built.block_topic,
260273
aggregation_topic: built.aggregation_topic,
261274
connected_peers: HashSet::new(),
@@ -288,7 +301,8 @@ pub struct P2PServer {
288301
// BlockChain protocol ref (set via InitBlockChain message)
289302
pub(crate) blockchain: Option<P2PToBlockChainRef>,
290303

291-
pub(crate) attestation_topic: libp2p::gossipsub::IdentTopic,
304+
pub(crate) attestation_topics: HashMap<u64, libp2p::gossipsub::IdentTopic>,
305+
pub(crate) attestation_committee_count: u64,
292306
pub(crate) block_topic: libp2p::gossipsub::IdentTopic,
293307
pub(crate) aggregation_topic: libp2p::gossipsub::IdentTopic,
294308

0 commit comments

Comments
 (0)