-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy pathmanager.rs
More file actions
165 lines (146 loc) · 6.49 KB
/
manager.rs
File metadata and controls
165 lines (146 loc) · 6.49 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
use crate::protocol::{NewBlock, ScrollMessage, ScrollWireEvent};
use alloy_primitives::B256;
use futures::StreamExt;
use reth_network::cache::LruCache;
use reth_network_api::PeerId;
use std::{
collections::{hash_map::Entry, HashMap},
future::Future,
pin::Pin,
task::{Context, Poll},
};
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::trace;
/// The size of the LRU cache used to track blocks that have been seen by peers.
pub const LRU_CACHE_SIZE: u32 = 100;
/// Tracks block announced and received state for a peer.
#[derive(Debug)]
pub struct PeerState {
/// blocks announced to the peer
announced: LruCache<B256>,
/// blocks received via scroll-wire protocol, this is used to penalize peers that send
/// duplicate blocks via scroll-wire.
scroll_wire_received: LruCache<B256>,
/// blocks received via eth-wire protocol, this is used to penalize peers that send duplicate
/// blocks via eth-wire.
eth_wire_received: LruCache<B256>,
}
impl PeerState {
/// Creates a new `PeerBlockState` with the specified LRU cache capacity.
pub fn new(capacity: u32) -> Self {
Self {
announced: LruCache::new(capacity),
scroll_wire_received: LruCache::new(capacity),
eth_wire_received: LruCache::new(capacity),
}
}
/// Check if peer knows about this block (either received or announced).
pub fn has_seen(&self, hash: &B256) -> bool {
self.announced.contains(hash) ||
self.scroll_wire_received.contains(hash) ||
self.eth_wire_received.contains(hash)
}
/// Check if peer has received this block via scroll-wire specifically (for duplicate
/// detection).
pub fn has_seen_via_scroll_wire(&self, hash: &B256) -> bool {
self.scroll_wire_received.contains(hash)
}
/// Check if peer has received this block via eth-wire specifically (for duplicate detection).
pub fn has_seen_via_eth_wire(&self, hash: &B256) -> bool {
self.eth_wire_received.contains(hash)
}
/// Record that this peer has received a block via scroll-wire.
pub fn insert_scroll_wire(&mut self, hash: B256) {
self.scroll_wire_received.insert(hash); // Track for duplicate detection
}
/// Record that this peer has received a block via eth-wire.
pub fn insert_eth_wire(&mut self, hash: B256) {
self.eth_wire_received.insert(hash); // Track for duplicate detection
}
/// Record that we have announced a block to this peer.
pub fn insert_announced(&mut self, hash: B256) {
self.announced.insert(hash); // Only update unified announced, not protocol-specific
}
}
/// A manager for the `ScrollWire` protocol.
#[derive(Debug)]
pub struct ScrollWireManager {
/// A stream of [`ScrollWireEvent`]s produced by the scroll wire protocol.
events: UnboundedReceiverStream<ScrollWireEvent>,
/// A map of connections to peers.
connections: HashMap<PeerId, UnboundedSender<ScrollMessage>>,
/// Unified state tracking block state and blocks received from each peer via both protocols.
peer_state: HashMap<PeerId, PeerState>,
}
impl ScrollWireManager {
/// Creates a new [`ScrollWireManager`] instance.
pub fn new(events: UnboundedReceiver<ScrollWireEvent>) -> Self {
trace!(target: "scroll::wire::manager", "Creating new ScrollWireManager instance");
Self { events: events.into(), connections: HashMap::new(), peer_state: HashMap::new() }
}
/// Announces a new block to the specified peer.
pub fn announce_block(&mut self, peer_id: PeerId, block: &NewBlock, hash: B256) {
if let Entry::Occupied(to_connection) = self.connections.entry(peer_id) {
// We send the block to the peer. If we receive an error we remove the peer from the
// connections map and peer_block_state as the connection is no longer valid.
if to_connection.get().send(ScrollMessage::new_block(block.clone())).is_err() {
trace!(target: "scroll::wire::manager", peer_id = %peer_id, "Failed to send block to peer - dropping peer.");
self.peer_state.remove(&peer_id);
to_connection.remove();
} else {
trace!(target: "scroll::wire::manager", peer_id = %peer_id, "Announced block to peer");
// Record that we announced this block to the peer
self.peer_state
.entry(peer_id)
.or_insert_with(|| PeerState::new(LRU_CACHE_SIZE))
.insert_announced(hash);
}
}
}
/// Returns an iterator over the connected peer IDs.
pub fn connected_peers(&self) -> impl Iterator<Item = &PeerId> {
self.connections.keys()
}
/// Returns a reference to the peer state map.
pub const fn peer_state(&self) -> &HashMap<PeerId, PeerState> {
&self.peer_state
}
/// Returns a mutable reference to the peer state map.
pub const fn peer_state_mut(&mut self) -> &mut HashMap<PeerId, PeerState> {
&mut self.peer_state
}
}
impl Future for ScrollWireManager {
type Output = ScrollWireEvent;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
// Process events from the network.
while let Poll::Ready(new_block) = this.events.poll_next_unpin(cx) {
match new_block {
Some(ScrollWireEvent::NewBlock { peer_id, block, signature }) => {
// We announce the block to the network.
trace!(target: "scroll::wire::manager", "Received new block with signature [{signature:?}] from the network: {:?} ", block.hash_slow());
return Poll::Ready(ScrollWireEvent::NewBlock { peer_id, block, signature });
}
Some(ScrollWireEvent::ConnectionEstablished {
direction,
peer_id,
to_connection,
}) => {
trace!(
target: "scroll::wire::manager",
peer_id = %peer_id,
direction = ?direction,
"Established connection with peer: {:?} for direction: {:?}",
peer_id,
direction
);
this.connections.insert(peer_id, to_connection);
}
None => break,
}
}
Poll::Pending
}
}