Skip to content

Commit a4885e5

Browse files
committed
feat: weighted round-robin proposer selection
1 parent 62efa15 commit a4885e5

9 files changed

Lines changed: 681 additions & 42 deletions

File tree

crates/consensus/src/context.rs

Lines changed: 26 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
1+
use std::sync::Arc;
2+
13
use informalsystems_malachitebft_core_types::{
24
Context as MalachiteContext, NilOrVal, Round, ValueId, VoteType,
35
};
46

57
use crate::config::ConsensusConfig;
68
use crate::error::ConsensusError;
79
use crate::proposal::{CutProposal, CutProposalPart};
10+
use crate::proposer_selector::ProposerSelector;
811
use crate::signing::Ed25519SigningScheme;
912
use crate::types::{ConsensusHeight, ConsensusValue};
1013
use crate::validator_set::{ConsensusAddress, ConsensusValidator, ConsensusValidatorSet};
@@ -32,6 +35,8 @@ pub struct CipherBftContext {
3235
pub validator_set: ConsensusValidatorSet,
3336
/// Height the engine should start from.
3437
pub initial_height: ConsensusHeight,
38+
/// Proposer selector using Tendermint's weighted round-robin algorithm.
39+
proposer_selector: Arc<ProposerSelector>,
3540
}
3641

3742
impl CipherBftContext {
@@ -47,10 +52,12 @@ impl CipherBftContext {
4752
if validator_set.is_empty() {
4853
return Err(ConsensusError::EmptyValidatorSet);
4954
}
55+
let proposer_selector = Arc::new(ProposerSelector::new(&validator_set, initial_height));
5056
Ok(Self {
5157
config,
5258
validator_set,
5359
initial_height,
60+
proposer_selector,
5461
})
5562
}
5663

@@ -82,19 +89,24 @@ impl CipherBftContext {
8289
self.config.chain_id()
8390
}
8491

85-
/// Deterministic round-robin proposer selection.
86-
pub fn proposer_at_round(&self, round: Round) -> Option<ConsensusAddress> {
87-
let count = self.validator_set.len();
88-
if count == 0 {
92+
/// Access the proposer selector for external priority updates.
93+
pub fn proposer_selector(&self) -> &Arc<ProposerSelector> {
94+
&self.proposer_selector
95+
}
96+
97+
/// Weighted round-robin proposer selection for a given height and round.
98+
pub fn proposer_at_round(
99+
&self,
100+
height: ConsensusHeight,
101+
round: Round,
102+
) -> Option<ConsensusAddress> {
103+
if self.validator_set.is_empty() {
89104
return None;
90105
}
91-
92-
// Use round index modulo validator count; nil rounds map to first validator.
93-
let idx = match round.as_i64() {
94-
x if x < 0 => 0,
95-
x => (x as usize) % count,
96-
};
97-
self.validator_set.as_slice().get(idx).map(|v| v.address)
106+
let proposer = self
107+
.proposer_selector
108+
.select_proposer(&self.validator_set, height, round);
109+
Some(proposer.address)
98110
}
99111
}
100112

@@ -113,18 +125,11 @@ impl MalachiteContext for CipherBftContext {
113125
fn select_proposer<'a>(
114126
&self,
115127
validator_set: &'a Self::ValidatorSet,
116-
_height: Self::Height,
128+
height: Self::Height,
117129
round: Round,
118130
) -> &'a Self::Validator {
119-
let count = validator_set.len();
120-
let idx = match round.as_i64() {
121-
x if x < 0 => 0,
122-
x => (x as usize) % count.max(1),
123-
};
124-
validator_set
125-
.as_slice()
126-
.get(idx)
127-
.expect("validator_set must not be empty")
131+
self.proposer_selector
132+
.select_proposer(validator_set, height, round)
128133
}
129134

130135
fn new_proposal(

crates/consensus/src/host.rs

Lines changed: 86 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,17 @@ pub struct HostConfig {
6363
/// Higher values support longer sync windows for lagging nodes,
6464
/// at the cost of increased memory usage.
6565
pub decided_retention: usize,
66+
67+
/// Timeout in milliseconds to wait for the next cut after a decision (default: 50ms).
68+
///
69+
/// After consensus decides on a block, the host waits for the next cut to be
70+
/// available before starting the next consensus round. This prevents a race
71+
/// condition where consensus requests a value before DCL has produced the cut.
72+
///
73+
/// Lower values improve block throughput but may cause NIL votes if cuts
74+
/// aren't ready. Higher values reduce throughput but give DCL more time.
75+
/// Set to 0 to disable waiting entirely.
76+
pub wait_for_cut_timeout_ms: u64,
6677
}
6778

6879
impl Default for HostConfig {
@@ -76,6 +87,11 @@ impl Default for HostConfig {
7687
// 5 seconds. This larger window ensures peers can serve historical data for catch-up.
7788
// Note: Persistent storage (dcl_store) is also used for older blocks.
7889
decided_retention: 10000,
90+
// Reduced from 500ms to 50ms to improve block throughput.
91+
// Most cuts should be available within a few milliseconds if DCL is healthy.
92+
// If cuts aren't ready, consensus will proceed and may vote NIL on round 0,
93+
// but this is preferable to adding 500ms latency per block.
94+
wait_for_cut_timeout_ms: 50,
7995
}
8096
}
8197
}
@@ -200,6 +216,9 @@ pub struct CipherBftHost {
200216
/// Host configuration.
201217
config: HostConfig,
202218

219+
/// Proposer selector for weighted round-robin (optional for backward compatibility).
220+
proposer_selector: Option<Arc<crate::proposer_selector::ProposerSelector>>,
221+
203222
/// Tracing span for this actor.
204223
span: tracing::Span,
205224
}
@@ -226,10 +245,23 @@ impl CipherBftHost {
226245
value_builder,
227246
decision_handler,
228247
config,
248+
proposer_selector: None,
229249
span,
230250
}
231251
}
232252

253+
/// Set the proposer selector for weighted round-robin updates.
254+
///
255+
/// When set, the host will update the proposer selector's priorities
256+
/// when validator sets change at epoch boundaries.
257+
pub fn with_proposer_selector(
258+
mut self,
259+
proposer_selector: Arc<crate::proposer_selector::ProposerSelector>,
260+
) -> Self {
261+
self.proposer_selector = Some(proposer_selector);
262+
self
263+
}
264+
233265
/// Get the validator set for a specific height.
234266
///
235267
/// This method delegates to `ValidatorSetManager` to return the
@@ -679,7 +711,7 @@ impl Actor for CipherBftHost {
679711
}
680712

681713
// Check for epoch transition
682-
let _epoch_transition = match self.on_block_committed(height) {
714+
let epoch_transition = match self.on_block_committed(height) {
683715
Ok(true) => {
684716
info!(
685717
parent: &self.span,
@@ -701,10 +733,38 @@ impl Actor for CipherBftHost {
701733
}
702734
};
703735

736+
// Update proposer selector if validator set changed
737+
if epoch_transition {
738+
if let Some(ref selector) = self.proposer_selector {
739+
// Get the new validator set for the next height
740+
if let Ok(new_validator_set) = self.get_validator_set(height.next()) {
741+
selector.update_validator_set(&new_validator_set);
742+
debug!(
743+
parent: &self.span,
744+
height = height.0,
745+
"Updated proposer selector for new validator set"
746+
);
747+
}
748+
}
749+
}
750+
751+
// Periodic priority maintenance to prevent overflow
752+
if let Some(ref selector) = self.proposer_selector {
753+
if height.0 % 100 == 0 {
754+
selector.center_priorities();
755+
selector.scale_if_needed();
756+
trace!(
757+
parent: &self.span,
758+
height = height.0,
759+
"Centered proposer selector priorities"
760+
);
761+
}
762+
}
763+
704764
// Reply with the next height to start
705765
let next_height = height.next();
706766

707-
// CRITICAL: Wait for the next cut to be available before starting next height.
767+
// Wait for the next cut to be available before starting next height.
708768
//
709769
// This fixes a race condition where:
710770
// 1. on_decided() sends the decision to the node's event loop
@@ -716,21 +776,23 @@ impl Actor for CipherBftHost {
716776
// By waiting here, we ensure the node has processed the decision and
717777
// Primary has formed the next Cut before consensus starts requesting it.
718778
//
719-
// Timeout of 500ms should be sufficient for the node to process the
720-
// decision and form the Cut. If it times out, consensus will still
721-
// start but may vote NIL on the first round.
722-
let wait_timeout = std::time::Duration::from_millis(500);
723-
if !self
724-
.value_builder
725-
.wait_for_cut(next_height, wait_timeout)
726-
.await
727-
{
728-
warn!(
729-
parent: &self.span,
730-
next_height = next_height.0,
731-
"Cut not ready for next height after {:?}, proceeding anyway",
732-
wait_timeout
733-
);
779+
// The timeout is configurable via HostConfig::wait_for_cut_timeout_ms.
780+
// If it times out, consensus will still start but may vote NIL on round 0.
781+
let wait_timeout_ms = self.config.wait_for_cut_timeout_ms;
782+
if wait_timeout_ms > 0 {
783+
let wait_timeout = std::time::Duration::from_millis(wait_timeout_ms);
784+
if !self
785+
.value_builder
786+
.wait_for_cut(next_height, wait_timeout)
787+
.await
788+
{
789+
warn!(
790+
parent: &self.span,
791+
next_height = next_height.0,
792+
"Cut not ready for next height after {:?}, proceeding anyway",
793+
wait_timeout
794+
);
795+
}
734796
}
735797

736798
match self.get_validator_set(next_height) {
@@ -1748,6 +1810,7 @@ impl DecisionHandler for ChannelDecisionHandler {
17481810
/// * `decided_tx` - Channel to send decided events
17491811
/// * `network` - Optional network reference for publishing proposal parts
17501812
/// * `dcl_store` - Optional persistent storage for decisions (enables sync support)
1813+
/// * `wait_for_cut_timeout_ms` - Timeout in ms to wait for next cut after decision (0 to disable)
17511814
///
17521815
/// # Returns
17531816
///
@@ -1776,6 +1839,7 @@ pub async fn spawn_host(
17761839
decided_tx: Option<mpsc::Sender<(ConsensusHeight, Cut)>>,
17771840
network: Option<NetworkRef<CipherBftContext>>,
17781841
dcl_store: Option<Arc<dyn cipherbft_storage::DclStore>>,
1842+
wait_for_cut_timeout_ms: u64,
17791843
) -> anyhow::Result<HostRef<CipherBftContext>> {
17801844
// Extract validators from context
17811845
let validators: Vec<_> = ctx.validator_set.as_slice().to_vec();
@@ -1787,8 +1851,11 @@ pub async fn spawn_host(
17871851
.map_err(|e| anyhow::anyhow!("Failed to create validator set manager: {}", e))?,
17881852
);
17891853

1790-
// Create config with default retention values
1791-
let config = HostConfig::default();
1854+
// Create config with provided wait_for_cut_timeout
1855+
let config = HostConfig {
1856+
wait_for_cut_timeout_ms,
1857+
..Default::default()
1858+
};
17921859

17931860
// Create channel-based handlers with config values
17941861
let value_builder = if let Some(network_ref) = network {

crates/consensus/src/lib.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ pub mod vote;
3737
#[cfg(feature = "malachite")]
3838
pub mod wal;
3939

40+
#[cfg(feature = "malachite")]
41+
pub mod proposer_selector;
42+
4043
#[cfg(feature = "malachite")]
4144
pub mod execution_handler;
4245

@@ -83,6 +86,9 @@ pub use vote::ConsensusVote;
8386
#[cfg(feature = "malachite")]
8487
pub use wal::spawn_wal;
8588

89+
#[cfg(feature = "malachite")]
90+
pub use proposer_selector::ProposerSelector;
91+
8692
#[cfg(feature = "malachite")]
8793
pub use execution_handler::{
8894
ExecutingDecisionHandler, ExecutionCallback, NoOpReceiptStore, ReceiptStore,

0 commit comments

Comments
 (0)