Skip to content

Commit b5bf7e2

Browse files
committed
chore: refactor cur_tipset in MessagePool to use RwLock instead Mutex
1 parent 6fa222b commit b5bf7e2

4 files changed

Lines changed: 24 additions & 21 deletions

File tree

src/message_pool/msgpool/mod.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use crate::utils::get_size::CidWrapper;
2121
use ahash::{HashMap, HashMapExt, HashSet, HashSetExt};
2222
use cid::Cid;
2323
use fvm_ipld_encoding::to_vec;
24-
use parking_lot::{Mutex, RwLock as SyncRwLock};
24+
use parking_lot::RwLock as SyncRwLock;
2525
use tracing::error;
2626
use utils::{get_base_fee_lower_bound, recover_sig};
2727

@@ -57,15 +57,15 @@ async fn republish_pending_messages<T>(
5757
api: &T,
5858
network_sender: &flume::Sender<NetworkMessage>,
5959
pending: &SyncRwLock<HashMap<Address, MsgSet>>,
60-
cur_tipset: &Mutex<Arc<Tipset>>,
60+
cur_tipset: &SyncRwLock<Arc<Tipset>>,
6161
republished: &SyncRwLock<HashSet<Cid>>,
6262
local_addrs: &SyncRwLock<Vec<Address>>,
6363
chain_config: &ChainConfig,
6464
) -> Result<(), Error>
6565
where
6666
T: Provider,
6767
{
68-
let ts = cur_tipset.lock().clone();
68+
let ts = cur_tipset.read().clone();
6969
let mut pending_map: HashMap<Address, HashMap<u64, SignedMessage>> = HashMap::new();
7070

7171
republished.write().clear();
@@ -216,7 +216,7 @@ pub async fn head_change<T>(
216216
repub_trigger: Arc<flume::Sender<()>>,
217217
republished: &SyncRwLock<HashSet<Cid>>,
218218
pending: &SyncRwLock<HashMap<Address, MsgSet>>,
219-
cur_tipset: &Mutex<Arc<Tipset>>,
219+
cur_tipset: &SyncRwLock<Arc<Tipset>>,
220220
revert: Vec<Tipset>,
221221
apply: Vec<Tipset>,
222222
) -> Result<(), Error>
@@ -227,7 +227,7 @@ where
227227
let mut rmsgs: HashMap<Address, HashMap<u64, SignedMessage>> = HashMap::new();
228228
for ts in revert {
229229
let pts = api.load_tipset(ts.parents())?;
230-
*cur_tipset.lock() = pts;
230+
*cur_tipset.write() = pts;
231231

232232
let mut msgs: Vec<SignedMessage> = Vec::new();
233233
for block in ts.block_headers() {
@@ -266,7 +266,7 @@ where
266266
}
267267
}
268268
}
269-
*cur_tipset.lock() = Arc::new(ts);
269+
*cur_tipset.write() = Arc::new(ts);
270270
}
271271
if repub {
272272
repub_trigger
@@ -276,7 +276,7 @@ where
276276
}
277277
for (_, hm) in rmsgs {
278278
for (_, msg) in hm {
279-
let sequence = get_state_sequence(api, &msg.from(), &cur_tipset.lock().clone())?;
279+
let sequence = get_state_sequence(api, &msg.from(), &cur_tipset.read().clone())?;
280280
if let Err(e) = add_helper(api, bls_sig_cache, pending, msg, sequence) {
281281
error!("Failed to read message from reorg to mpool: {}", e);
282282
}
@@ -616,7 +616,7 @@ pub mod tests {
616616
// sleep allows for async block to update mpool's cur_tipset
617617
tokio::time::sleep(Duration::new(2, 0)).await;
618618

619-
let cur_ts = mpool.cur_tipset.lock().clone();
619+
let cur_ts = mpool.current_tipset();
620620
assert_eq!(cur_ts.as_ref(), &tipset);
621621
}
622622

src/message_pool/msgpool/msg_pool.rs

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use futures::StreamExt;
3232
use fvm_ipld_encoding::to_vec;
3333
use itertools::Itertools;
3434
use nonzero_ext::nonzero;
35-
use parking_lot::{Mutex, RwLock as SyncRwLock};
35+
use parking_lot::RwLock as SyncRwLock;
3636
use tokio::{sync::broadcast::error::RecvError, task::JoinSet, time::interval};
3737
use tracing::warn;
3838

@@ -177,7 +177,7 @@ pub struct MessagePool<T> {
177177
/// A map of pending messages where the key is the address
178178
pub pending: Arc<SyncRwLock<HashMap<Address, MsgSet>>>,
179179
/// The current tipset (a set of blocks)
180-
pub cur_tipset: Arc<Mutex<Arc<Tipset>>>,
180+
pub cur_tipset: Arc<SyncRwLock<Arc<Tipset>>>,
181181
/// The underlying provider
182182
pub api: Arc<T>,
183183
/// Sender half to send messages to other components
@@ -202,6 +202,11 @@ impl<T> MessagePool<T>
202202
where
203203
T: Provider,
204204
{
205+
/// Gets the current tipset
206+
pub fn current_tipset(&self) -> Arc<Tipset> {
207+
self.cur_tipset.read().clone()
208+
}
209+
205210
/// Add a signed message to the pool and its address.
206211
fn add_local(&self, m: SignedMessage) -> Result<(), Error> {
207212
self.local_addrs.write().push(m.from());
@@ -214,7 +219,7 @@ where
214219
pub async fn push(&self, msg: SignedMessage) -> Result<Cid, Error> {
215220
self.check_message(&msg)?;
216221
let cid = msg.cid();
217-
let cur_ts = self.cur_tipset.lock().clone();
222+
let cur_ts = self.current_tipset();
218223
let publish = self.add_tipset(msg.clone(), &cur_ts, true)?;
219224
let msg_ser = to_vec(&msg)?;
220225
let network_name = self.chain_config.network.genesis_name();
@@ -249,10 +254,8 @@ where
249254
/// fits the parameters to be pushed to the `MessagePool`.
250255
pub fn add(&self, msg: SignedMessage) -> Result<(), Error> {
251256
self.check_message(&msg)?;
252-
253-
let tip = self.cur_tipset.lock().clone();
254-
255-
self.add_tipset(msg, &tip, false)?;
257+
let ts = self.current_tipset();
258+
self.add_tipset(msg, &ts, false)?;
256259
Ok(())
257260
}
258261

@@ -320,7 +323,7 @@ where
320323
/// the pending hash-map.
321324
fn add_helper(&self, msg: SignedMessage) -> Result<(), Error> {
322325
let from = msg.from();
323-
let cur_ts = self.cur_tipset.lock().clone();
326+
let cur_ts = self.current_tipset();
324327
add_helper(
325328
self.api.as_ref(),
326329
self.bls_sig_cache.as_ref(),
@@ -333,7 +336,7 @@ where
333336
/// Get the sequence for a given address, return Error if there is a failure
334337
/// to retrieve the respective sequence.
335338
pub fn get_sequence(&self, addr: &Address) -> Result<u64, Error> {
336-
let cur_ts = self.cur_tipset.lock().clone();
339+
let cur_ts = self.current_tipset();
337340

338341
let sequence = self.get_state_sequence(addr, &cur_ts)?;
339342

@@ -378,7 +381,7 @@ where
378381
)
379382
}
380383

381-
let cur_ts = self.cur_tipset.lock().clone();
384+
let cur_ts = self.current_tipset();
382385

383386
Ok((out, cur_ts))
384387
}
@@ -471,7 +474,7 @@ where
471474
{
472475
let local_addrs = Arc::new(SyncRwLock::new(Vec::new()));
473476
let pending = Arc::new(SyncRwLock::new(HashMap::new()));
474-
let tipset = Arc::new(Mutex::new(api.get_heaviest_tipset()));
477+
let tipset = Arc::new(SyncRwLock::new(api.get_heaviest_tipset()));
475478
let bls_sig_cache = Arc::new(SizeTrackingLruCache::new_with_default_metrics_registry(
476479
"bls_sig_cache".into(),
477480
BLS_SIG_CACHE_SIZE,

src/message_pool/msgpool/selection.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ where
290290
/// for inclusion from the pool, given the ticket quality of a miner.
291291
/// This method selects messages for including in a block.
292292
pub fn select_messages(&self, ts: &Tipset, tq: f64) -> Result<Vec<SignedMessage>, Error> {
293-
let cur_ts = self.cur_tipset.lock().clone();
293+
let cur_ts = self.current_tipset();
294294
// if the ticket quality is high enough that the first block has higher
295295
// probability than any other block, then we don't bother with optimal
296296
// selection because the first block will always have higher effective

src/rpc/methods/gas.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ impl GasEstimateGasLimit {
221221
.map(|s| s.into_iter().map(ChainMessage::Signed).collect::<Vec<_>>())
222222
.unwrap_or_default();
223223

224-
let ts = data.mpool.cur_tipset.lock().clone();
224+
let ts = data.mpool.current_tipset();
225225
// Pretend that the message is signed. This has an influence on the gas
226226
// cost. We obviously can't generate a valid signature. Instead, we just
227227
// fill the signature with zeros. The validity is not checked.

0 commit comments

Comments
 (0)