Skip to content

Commit 5050443

Browse files
committed
Fetch base_accounts and commit_ids in parallel
1 parent c581e9c commit 5050443

8 files changed

Lines changed: 170 additions & 204 deletions

File tree

magicblock-committor-service/src/intent_executor/mod.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,6 @@ pub mod two_stage_executor;
66

77
use std::{mem, ops::ControlFlow, sync::Arc, time::Duration};
88

9-
#[cfg(test)]
10-
mod null_task_info_fetcher;
11-
129
use async_trait::async_trait;
1310
use futures_util::future::{join, try_join_all};
1411
use log::{trace, warn};
@@ -25,8 +22,6 @@ use magicblock_rpc_client::{
2522
MagicBlockRpcClientError, MagicBlockSendTransactionConfig,
2623
MagicBlockSendTransactionOutcome, MagicblockRpcClient,
2724
};
28-
#[cfg(test)]
29-
pub use null_task_info_fetcher::*;
3025
use solana_keypair::Keypair;
3126
use solana_message::VersionedMessage;
3227
use solana_pubkey::Pubkey;

magicblock-committor-service/src/intent_executor/null_task_info_fetcher.rs

Lines changed: 0 additions & 42 deletions
This file was deleted.

magicblock-committor-service/src/intent_executor/task_info_fetcher.rs

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,7 @@ use dlp::{
99
use log::warn;
1010
use lru::LruCache;
1111
use magicblock_metrics::metrics;
12-
use magicblock_rpc_client::{
13-
MagicBlockRpcClientError, MagicBlockRpcClientResult, MagicblockRpcClient,
14-
};
12+
use magicblock_rpc_client::{MagicBlockRpcClientError, MagicblockRpcClient};
1513
use solana_account::Account;
1614
use solana_pubkey::Pubkey;
1715
use solana_signature::Signature;
@@ -40,10 +38,10 @@ pub trait TaskInfoFetcher: Send + Sync + 'static {
4038
/// Resets cache for some or all accounts
4139
fn reset(&self, reset_type: ResetType);
4240

43-
async fn get_base_account(
41+
async fn get_base_accounts(
4442
&self,
45-
_pubkey: &Pubkey,
46-
) -> MagicBlockRpcClientResult<Option<Account>>;
43+
pubkeys: &[Pubkey],
44+
) -> TaskInfoFetcherResult<HashMap<Pubkey, Account>>;
4745
}
4846

4947
pub enum ResetType<'a> {
@@ -272,11 +270,23 @@ impl TaskInfoFetcher for CacheTaskInfoFetcher {
272270
}
273271
}
274272

275-
async fn get_base_account(
273+
async fn get_base_accounts(
276274
&self,
277-
pubkey: &Pubkey,
278-
) -> MagicBlockRpcClientResult<Option<Account>> {
279-
self.rpc_client.get_account(pubkey).await
275+
pubkeys: &[Pubkey],
276+
) -> TaskInfoFetcherResult<HashMap<Pubkey, Account>> {
277+
self.rpc_client
278+
.get_multiple_accounts(pubkeys, None)
279+
.await
280+
.map_err(|err| {
281+
TaskInfoFetcherError::MagicBlockRpcClientError(Box::new(err))
282+
})
283+
.map(|accounts| {
284+
pubkeys
285+
.iter()
286+
.zip(accounts.into_iter())
287+
.filter_map(|(key, value)| value.map(|value| (*key, value)))
288+
.collect()
289+
})
280290
}
281291
}
282292

magicblock-committor-service/src/tasks/task_builder.rs

Lines changed: 48 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
use std::sync::Arc;
22

33
use async_trait::async_trait;
4-
use futures_util::future::join_all;
54
use log::error;
65
use magicblock_program::magic_scheduled_base_intent::{
76
CommitType, CommittedAccount, MagicBaseIntent, ScheduledBaseIntent,
87
UndelegateType,
98
};
9+
use solana_account::Account;
1010
use solana_pubkey::Pubkey;
1111
use solana_signature::Signature;
1212

@@ -51,31 +51,18 @@ pub struct TaskBuilderImpl;
5151
pub const COMMIT_STATE_SIZE_THRESHOLD: usize = 256;
5252

5353
impl TaskBuilderImpl {
54-
pub async fn create_commit_task<C: TaskInfoFetcher>(
54+
pub fn create_commit_task(
5555
commit_id: u64,
5656
allow_undelegation: bool,
5757
account: CommittedAccount,
58-
task_info_fetcher: &Arc<C>,
58+
base_account: Option<Account>,
5959
) -> ArgsTask {
60-
let base_account = if account.account.data.len()
61-
> COMMIT_STATE_SIZE_THRESHOLD
62-
{
63-
match task_info_fetcher.get_base_account(&account.pubkey).await {
64-
Ok(Some(account)) => Some(account),
65-
Ok(None) => {
66-
log::warn!("AccountNotFound for commit_diff, pubkey: {}, commit_id: {}, Falling back to commit_state.",
67-
account.pubkey, commit_id);
68-
None
69-
}
70-
Err(e) => {
71-
log::warn!("Failed to fetch base account for commit diff, pubkey: {}, commit_id: {}, error: {}. Falling back to commit_state.",
72-
account.pubkey, commit_id, e);
73-
None
74-
}
75-
}
76-
} else {
77-
None
78-
};
60+
let base_account =
61+
if account.account.data.len() > COMMIT_STATE_SIZE_THRESHOLD {
62+
base_account
63+
} else {
64+
None
65+
};
7966

8067
if let Some(base_account) = base_account {
8168
ArgsTaskType::CommitDiff(CommitDiffTask {
@@ -123,14 +110,37 @@ impl TasksBuilder for TaskBuilderImpl {
123110
}
124111
};
125112

126-
let committed_pubkeys = accounts
127-
.iter()
128-
.map(|account| account.pubkey)
129-
.collect::<Vec<_>>();
130-
let commit_ids = commit_id_fetcher
131-
.fetch_next_commit_ids(&committed_pubkeys)
132-
.await
133-
.map_err(TaskBuilderError::CommitTasksBuildError)?;
113+
let (commit_ids, base_accounts) = {
114+
let committed_pubkeys = accounts
115+
.iter()
116+
.map(|account| account.pubkey)
117+
.collect::<Vec<_>>();
118+
119+
let diffable_pubkeys = accounts
120+
.iter()
121+
.filter(|account| {
122+
account.account.data.len() >= COMMIT_STATE_SIZE_THRESHOLD
123+
})
124+
.map(|account| account.pubkey)
125+
.collect::<Vec<_>>();
126+
127+
tokio::join!(
128+
commit_id_fetcher.fetch_next_commit_ids(&committed_pubkeys),
129+
commit_id_fetcher
130+
.get_base_accounts(diffable_pubkeys.as_slice())
131+
)
132+
};
133+
134+
let commit_ids =
135+
commit_ids.map_err(TaskBuilderError::CommitTasksBuildError)?;
136+
137+
let base_accounts = match base_accounts {
138+
Ok(map) => map,
139+
Err(err) => {
140+
log::warn!("Failed to fetch base accounts for CommitDiff (id={}): {}; falling back to CommitState", base_intent.id, err);
141+
Default::default()
142+
}
143+
};
134144

135145
// Persist commit ids for commitees
136146
commit_ids
@@ -141,13 +151,17 @@ impl TasksBuilder for TaskBuilderImpl {
141151
}
142152
});
143153

144-
let tasks = join_all(accounts
154+
let tasks = accounts
145155
.iter()
146-
.map(|account| async {
156+
.map(|account| {
147157
let commit_id = *commit_ids.get(&account.pubkey).expect("CommitIdFetcher provide commit ids for all listed pubkeys, or errors!");
148-
let task = Self::create_commit_task(commit_id, allow_undelegation, account.clone(), commit_id_fetcher).await;
158+
// TODO (snawaz): if accounts do not have duplicate, then we can use remove
159+
// instead:
160+
// let base_account = base_accounts.remove(&account.pubkey);
161+
let base_account = base_accounts.get(&account.pubkey).cloned();
162+
let task = Self::create_commit_task(commit_id, allow_undelegation, account.clone(), base_account);
149163
Box::new(task) as Box<dyn BaseTask>
150-
})).await;
164+
}).collect();
151165

152166
Ok(tasks)
153167
}

0 commit comments

Comments
 (0)