Skip to content

Commit 3432714

Browse files
authored
feat(simulator): implement PacketSimulator for deterministic simulator (#2769)
Summary: - Adds a `PacketSimulator` that models a full network fault-injection layer: per-path latency (exponential distribution), packet loss, link capacity limits with random eviction, automatic network partitioning with configurable lifecycle, and per-path clogging. - Adds a `ReadyQueue` — a min-heap priority queue with reservoir-sampled random ready removal, used to order packets by delivery tick and uniformly select among simultaneously ready items. - Partition modes: `UniformSize`, `UniformPartition`, `IsolateSingle`, all guarantee at least one node in each partition. Supports symmetric and asymmetric partitions. Note: - Pull-based batch delivery: `step()` returns all ready packets for the current tick in a single Vec. Packets delivered in the same tick cannot trigger chain reactions within that tick. This was done because its difficult to do self-referential structs in Rust and even with using dynamic dispatch we have to fight the borrow checker since we would need mutable access to different fields at the same time.
1 parent 6f01169 commit 3432714

10 files changed

Lines changed: 1432 additions & 1 deletion

File tree

Cargo.lock

Lines changed: 26 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ dlopen2 = "0.8.2"
143143
dotenvy = "0.15.7"
144144
elasticsearch = { version = "9.1.0-alpha.1", features = ["rustls-tls"], default-features = false }
145145
enum_dispatch = "0.3.13"
146+
enumset = "1.1"
146147
env_logger = "0.11.9"
147148
err_trail = { version = "0.11.0", features = ["tracing"] }
148149
error_set = "0.9.1"

DEPENDENCIES.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,8 @@ embedded-io: 0.6.1, "Apache-2.0 OR MIT",
257257
encode_unicode: 1.0.0, "Apache-2.0 OR MIT",
258258
encoding_rs: 0.8.35, "(Apache-2.0 OR MIT) AND BSD-3-Clause",
259259
enum_dispatch: 0.3.13, "Apache-2.0 OR MIT",
260+
enumset: 1.1.10, "Apache-2.0 OR MIT",
261+
enumset_derive: 0.14.0, "Apache-2.0 OR MIT",
260262
equivalent: 1.0.2, "Apache-2.0 OR MIT",
261263
err_trail: 0.11.0, "Apache-2.0",
262264
errno: 0.3.14, "Apache-2.0 OR MIT",

core/common/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ comfy-table = { workspace = true }
4242
compio = { workspace = true }
4343
crossbeam = { workspace = true }
4444
derive_more = { workspace = true }
45+
enumset = { workspace = true }
4546
err_trail = { workspace = true }
4647
human-repr = { workspace = true }
4748
humantime = { workspace = true }

core/common/src/types/consensus/header.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
// under the License.
1717

1818
use bytemuck::{Pod, Zeroable};
19+
use enumset::EnumSetType;
1920
use thiserror::Error;
2021

2122
const HEADER_SIZE: usize = 256;
@@ -28,7 +29,7 @@ pub trait ConsensusHeader: Sized + Pod + Zeroable {
2829
fn size(&self) -> u32;
2930
}
3031

31-
#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)]
32+
#[derive(Default, Debug, EnumSetType)]
3233
#[repr(u8)]
3334
pub enum Command2 {
3435
#[default]

core/simulator/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,14 @@ edition = "2024"
2424
bytemuck = { workspace = true }
2525
bytes = { workspace = true }
2626
consensus = { path = "../consensus" }
27+
enumset = { workspace = true }
2728
futures = { workspace = true }
2829
iggy_common = { path = "../common" }
2930
journal = { path = "../journal" }
3031
message_bus = { path = "../message_bus" }
3132
metadata = { path = "../metadata" }
3233
partitions = { path = "../partitions" }
34+
rand = { workspace = true }
35+
rand_xoshiro = { workspace = true }
3336
shard = { path = "../shard" }
37+
tracing = { workspace = true }

core/simulator/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
pub mod bus;
1919
pub mod client;
2020
pub mod deps;
21+
pub mod network;
22+
pub mod packet;
23+
pub mod ready_queue;
2124
pub mod replica;
2225

2326
use bus::MemBus;

core/simulator/src/network.rs

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Network abstraction layer for the cluster simulator.
19+
//!
20+
//! **Note:** Currently a thin passthrough over `PacketSimulator`. Once the
21+
//! Cluster and MessageBus layers are built, this will own
22+
//! process-to-bus routing, and node enable/disable logic.
23+
24+
use crate::packet::{
25+
ALLOW_ALL, BLOCK_ALL, LinkFilter, Packet, PacketSimulator, PacketSimulatorOptions, ProcessId,
26+
};
27+
use iggy_common::{header::GenericHeader, message::Message};
28+
29+
/// Network layer for the cluster simulation.
30+
///
31+
/// This provides an interface over the `PacketSimulator` for the
32+
/// `Cluster` orchestrator to use. It handles:
33+
/// - Submitting packets into the network
34+
/// - Stepping the network to deliver ready packets
35+
/// - Managing network partitions and link states
36+
#[derive(Debug)]
37+
pub struct Network {
38+
simulator: PacketSimulator,
39+
}
40+
41+
impl Network {
42+
/// Create a new network.
43+
pub fn new(options: PacketSimulatorOptions) -> Self {
44+
Self {
45+
simulator: PacketSimulator::new(options),
46+
}
47+
}
48+
49+
/// Submit a message into the network.
50+
///
51+
/// The message will be queued with a simulated delay and may be:
52+
/// - Delivered normally after the delay
53+
/// - Dropped (based on packet_loss_probability)
54+
/// - Replayed/duplicated (based on replay_probability)
55+
pub fn submit(&mut self, from: ProcessId, to: ProcessId, message: Message<GenericHeader>) {
56+
self.simulator.submit(from, to, message);
57+
}
58+
59+
/// Deliver all ready packets.
60+
///
61+
/// The returned `Vec` is taken from an internal buffer. Pass it back via
62+
/// [`recycle_buffer`](Self::recycle_buffer) after processing to reuse the
63+
/// allocation on the next call.
64+
pub fn step(&mut self) -> Vec<Packet> {
65+
self.simulator.step()
66+
}
67+
68+
/// Return a previously taken buffer for reuse. See [`PacketSimulator::recycle_buffer`].
69+
pub fn recycle_buffer(&mut self, buf: Vec<Packet>) {
70+
self.simulator.recycle_buffer(buf);
71+
}
72+
73+
/// Advance network time by one tick.
74+
///
75+
/// This should be called once per simulation tick, after all ready
76+
/// packets have been delivered. Handles automatic partition lifecycle
77+
/// and random path clogging.
78+
pub fn tick(&mut self) {
79+
self.simulator.tick();
80+
}
81+
82+
/// Get the current network tick.
83+
pub fn current_tick(&self) -> u64 {
84+
self.simulator.current_tick()
85+
}
86+
87+
/// Register a client with the network.
88+
///
89+
/// Clients must be registered before they can send or receive packets.
90+
pub fn register_client(&mut self, client_id: u128) {
91+
self.simulator.register_client(client_id);
92+
}
93+
94+
/// Set the enabled/disabled state of a specific link.
95+
/// Maps `enabled = true` to [`ALLOW_ALL`] and `enabled = false` to [`BLOCK_ALL`].
96+
pub fn set_link_filter(&mut self, from: ProcessId, to: ProcessId, enabled: bool) {
97+
let filter = self.simulator.link_filter(from, to);
98+
*filter = if enabled { ALLOW_ALL } else { BLOCK_ALL };
99+
}
100+
101+
/// Get a mutable reference to a link's command filter.
102+
/// Allows per-command filtering (e.g., block only Prepare messages).
103+
pub fn link_filter_mut(&mut self, from: ProcessId, to: ProcessId) -> &mut LinkFilter {
104+
self.simulator.link_filter(from, to)
105+
}
106+
107+
/// Check whether a specific link is enabled (filter is not empty).
108+
pub fn is_link_enabled(&self, from: ProcessId, to: ProcessId) -> bool {
109+
self.simulator.is_link_enabled(from, to)
110+
}
111+
112+
/// Clear all partitions, restoring full connectivity.
113+
///
114+
/// **Warning:** resets all link filters to [`ALLOW_ALL`], including
115+
/// manually-set per-command filters.
116+
pub fn clear_partition(&mut self) {
117+
self.simulator.clear_partition();
118+
}
119+
120+
/// Clear all pending packets on a specific link.
121+
pub fn link_clear(&mut self, from: ProcessId, to: ProcessId) {
122+
self.simulator.link_clear(from, to);
123+
}
124+
125+
/// Returns a mutable reference to the link's optional drop-packet predicate.
126+
pub fn link_drop_packet_fn(
127+
&mut self,
128+
from: ProcessId,
129+
to: ProcessId,
130+
) -> &mut Option<fn(&Packet) -> bool> {
131+
self.simulator.link_drop_packet_fn(from, to)
132+
}
133+
134+
/// Clog a specific link (bidirectionally).
135+
///
136+
/// Clogged links do not deliver any packets until unclogged.
137+
pub fn clog(&mut self, from: ProcessId, to: ProcessId) {
138+
self.simulator.clog(from, to);
139+
}
140+
141+
/// Unclog a specific link (bidirectionally).
142+
pub fn unclog(&mut self, from: ProcessId, to: ProcessId) {
143+
self.simulator.unclog(from, to);
144+
}
145+
146+
/// Get the number of packets currently in flight.
147+
pub fn packets_in_flight(&self) -> usize {
148+
self.simulator.packets_in_flight()
149+
}
150+
}

0 commit comments

Comments
 (0)