Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 13 additions & 21 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -969,12 +969,6 @@ where
fn join(&mut self, topic_hash: &TopicHash) {
tracing::debug!(topic=%topic_hash, "Running JOIN for topic");

// if we are already in the mesh, return
if self.mesh.contains_key(topic_hash) {
tracing::debug!(topic=%topic_hash, "JOIN: The topic is already in the mesh, ignoring JOIN");
return;
}

let mut added_peers = HashSet::new();
let mesh_n = self.config.mesh_n_for_topic(topic_hash);
if let Some(m) = self.metrics.as_mut() {
Expand Down Expand Up @@ -1015,9 +1009,9 @@ where
self.fanout_last_pub.remove(topic_hash);
}

let fanaout_added = added_peers.len();
let fanout_added = added_peers.len();
if let Some(m) = self.metrics.as_mut() {
m.peers_included(topic_hash, Inclusion::Fanout, fanaout_added)
m.peers_included(topic_hash, Inclusion::Fanout, fanout_added)
}

// check if we need to get more peers, which we randomly select
Expand Down Expand Up @@ -1045,7 +1039,7 @@ where
mesh_peers.extend(new_peers);
}

let random_added = added_peers.len() - fanaout_added;
let random_added = added_peers.len() - fanout_added;
if let Some(m) = self.metrics.as_mut() {
m.peers_included(topic_hash, Inclusion::Random, random_added)
}
Expand Down Expand Up @@ -1251,14 +1245,6 @@ where

let mut iwant_ids = HashSet::new();

let want_message = |id: &MessageId| {
if self.duplicate_cache.contains(id) {
return false;
}

!self.gossip_promises.contains(id)
};

for (topic, ids) in ihave_msgs {
// only process the message if we are subscribed
if !self.mesh.contains_key(&topic) {
Expand All @@ -1269,7 +1255,13 @@ where
continue;
}

for id in ids.into_iter().filter(want_message) {
for id in ids.into_iter().filter(|id| {
if self.duplicate_cache.contains(id) {
return false;
}

!self.gossip_promises.contains(id)
}) {
// have not seen this message and are not currently requesting it
if iwant_ids.insert(id) {
// Register the IWANT metric
Expand Down Expand Up @@ -2158,7 +2150,7 @@ where
topic=%topic_hash,
"HEARTBEAT: Mesh low. Topic contains: {} needs: {}",
peers.len(),
mesh_n_low
self.config.mesh_n()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But isn't mesh_n_low the minimum that it needs?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not in this case, we target mesh_n, see two lines bellow:

// not enough peers - get mesh_n - current_length more
let desired_peers = self.config.mesh_n() - peers.len();

which conforms to the spec

);
// not enough peers - get mesh_n - current_length more
let desired_peers = mesh_n - peers.len();
Expand All @@ -2185,9 +2177,9 @@ where
if peers.len() > mesh_n_high {
tracing::debug!(
topic=%topic_hash,
"HEARTBEAT: Mesh high. Topic contains: {} needs: {}",
"HEARTBEAT: Mesh high. Topic contains: {} will reduce to: {}",
peers.len(),
mesh_n_high
self.config.mesh_n()
);
let excess_peer_no = peers.len() - mesh_n;

Expand Down
Loading