Skip to content

Commit 8dc2316

Browse files
hubciokriti-sc
authored andcommitted
feat(consensus): add loopback queue for primary self-addressed messages (apache#2825)
1 parent ccf8077 commit 8dc2316

6 files changed

Lines changed: 350 additions & 33 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/consensus/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,3 +33,6 @@ iggy_common = { workspace = true }
3333
message_bus = { workspace = true }
3434
rand = { workspace = true }
3535
rand_xoshiro = { workspace = true }
36+
37+
[dev-dependencies]
38+
futures = { workspace = true }

core/consensus/src/impls.rs

Lines changed: 60 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ use crate::{
2222
};
2323
use bit_set::BitSet;
2424
use iggy_common::header::{
25-
Command2, ConsensusHeader, DoViewChangeHeader, PrepareHeader, PrepareOkHeader, RequestHeader,
26-
StartViewChangeHeader, StartViewHeader,
25+
Command2, ConsensusHeader, DoViewChangeHeader, GenericHeader, PrepareHeader, PrepareOkHeader,
26+
RequestHeader, StartViewChangeHeader, StartViewHeader,
2727
};
2828
use iggy_common::message::Message;
2929
use message_bus::IggyMessageBus;
@@ -435,7 +435,7 @@ where
435435
pipeline: RefCell<P>,
436436

437437
message_bus: B,
438-
// TODO: Add loopback_queue for messages to self
438+
loopback_queue: RefCell<VecDeque<Message<GenericHeader>>>,
439439
/// Tracks start view change messages received from all replicas (including self)
440440
start_view_change_from_all_replicas: RefCell<BitSet<u32>>,
441441

@@ -484,6 +484,7 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> {
484484
last_prepare_checksum: Cell::new(0),
485485
pipeline: RefCell::new(pipeline),
486486
message_bus,
487+
loopback_queue: RefCell::new(VecDeque::with_capacity(PIPELINE_PREPARE_QUEUE_MAX)),
487488
start_view_change_from_all_replicas: RefCell::new(BitSet::with_capacity(REPLICAS_MAX)),
488489
do_view_change_from_all_replicas: RefCell::new(dvc_quorum_array_empty()),
489490
do_view_change_quorum: Cell::new(false),
@@ -621,12 +622,18 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> {
621622
self.do_view_change_quorum.set(false);
622623
}
623624

624-
/// Reset all view change state for a new view.
625-
fn reset_view_change_state(&self) {
625+
/// Reset all view change state when transitioning to a new view.
626+
///
627+
/// Clears the loopback queue: stale PrepareOks from the old view
628+
/// reference pipeline entries that no longer exist, so processing
629+
/// them would be a no-op (handle_prepare_ok ignores unknown ops).
630+
/// The primary does not require its own self-ack for quorum.
631+
pub(crate) fn reset_view_change_state(&self) {
626632
self.reset_svc_quorum();
627633
self.reset_dvc_quorum();
628634
self.sent_own_start_view_change.set(false);
629635
self.sent_own_do_view_change.set(false);
636+
self.loopback_queue.borrow_mut().clear();
630637
}
631638

632639
/// Process one tick. Call this periodically (e.g., every 10ms).
@@ -1061,6 +1068,11 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> {
10611068
// Stale pipeline entries from the old view are invalid in the new view.
10621069
// Log reconciliation replays from the journal, not the pipeline.
10631070
self.pipeline.borrow_mut().clear();
1071+
// Stale PrepareOk messages from the old view must not leak into the new view.
1072+
// `reset_view_change_state` handles this for view-number advances (SVC/DVC/SV),
1073+
// but this path fires within the current view after DVC quorum -- so we clear
1074+
// the loopback queue directly.
1075+
self.loopback_queue.borrow_mut().clear();
10641076

10651077
// Update timeouts for normal primary operation
10661078
{
@@ -1079,12 +1091,10 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> {
10791091
}]
10801092
}
10811093

1082-
/// Handle a prepare_ok message from a follower.
1083-
/// Called on the primary when a follower acknowledges a prepare.
1094+
/// Handle a PrepareOk message from a replica.
10841095
///
1085-
/// Returns true if quorum was just reached for this op.
1086-
/// Handle a PrepareOk message. Returns true if quorum was reached.
1087-
/// Note: Caller (on_ack) should validate is_primary and status before calling.
1096+
/// Returns `true` if quorum was just reached for this op.
1097+
/// Caller (`on_ack`) should validate `is_primary` and status before calling.
10881098
pub fn handle_prepare_ok(&self, header: &PrepareOkHeader) -> bool {
10891099
assert_eq!(header.command, Command2::PrepareOk);
10901100
assert!(
@@ -1139,6 +1149,42 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> {
11391149
false
11401150
}
11411151

1152+
/// Enqueue a self-addressed message for processing in the next loopback drain.
1153+
///
1154+
/// Currently only PrepareOk messages are routed here (via `send_or_loopback`).
1155+
// TODO: Route SVC/DVC self-messages through loopback once VsrAction dispatch is implemented.
1156+
pub(crate) fn push_loopback(&self, message: Message<GenericHeader>) {
1157+
assert!(
1158+
self.loopback_queue.borrow().len() < PIPELINE_PREPARE_QUEUE_MAX,
1159+
"loopback queue overflow: {} items",
1160+
self.loopback_queue.borrow().len()
1161+
);
1162+
self.loopback_queue.borrow_mut().push_back(message);
1163+
}
1164+
1165+
/// Drain all pending loopback messages into `buf`, leaving the queue empty.
1166+
///
1167+
/// The caller must dispatch each drained message to the appropriate handler.
1168+
pub fn drain_loopback_into(&self, buf: &mut Vec<Message<GenericHeader>>) {
1169+
buf.extend(self.loopback_queue.borrow_mut().drain(..));
1170+
}
1171+
1172+
/// Send a message to `target`, routing self-addressed messages through the loopback queue.
1173+
pub(crate) async fn send_or_loopback(&self, target: u8, message: Message<GenericHeader>)
1174+
where
1175+
B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
1176+
{
1177+
if target == self.replica {
1178+
self.push_loopback(message);
1179+
} else {
1180+
// TODO: Propagate send errors instead of panicking; requires bus error design.
1181+
self.message_bus
1182+
.send_to_replica(target, message)
1183+
.await
1184+
.unwrap();
1185+
}
1186+
}
1187+
11421188
pub fn message_bus(&self) -> &B {
11431189
&self.message_bus
11441190
}
@@ -1222,12 +1268,10 @@ where
12221268
type Sequencer = LocalSequencer;
12231269
type Pipeline = P;
12241270

1225-
// TODO(hubcio): maybe we could record the primary's own ack here
1226-
// (entry.add_ack(self.replica)) instead of round-tripping through
1227-
// the message bus via send_prepare_ok.
1228-
// This avoids serialization/queuing overhead and would also allow
1229-
// reordering to WAL-first (on_replicate before pipeline_message)
1230-
// without risking lost self-acks from dispatch timing.
1271+
// The primary's self-ack is delivered via the loopback queue
1272+
// (push_loopback / drain_loopback_into) rather than inline here,
1273+
// so that WAL persistence can happen between pipeline insertion
1274+
// and ack recording.
12311275
fn pipeline_message(&self, message: Self::Message<Self::ReplicateHeader>) {
12321276
assert!(self.is_primary(), "only primary can pipeline messages");
12331277

0 commit comments

Comments
 (0)