From 88601b6a494e7433f9bab09764ed9b76a1a887c4 Mon Sep 17 00:00:00 2001 From: PoulavBhowmick03 Date: Sat, 28 Mar 2026 13:09:24 +0530 Subject: [PATCH 01/10] feat: add mev cli tests --- Cargo.lock | 1 + crates/cli/Cargo.toml | 1 + crates/cli/src/commands/test/mev.rs | 918 +++++++++++++++++++++++++++- crates/cli/src/commands/test/mod.rs | 4 + 4 files changed, 915 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f5a9d0b9..c956890d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5472,6 +5472,7 @@ dependencies = [ "tokio", "tokio-util", "tracing", + "url", ] [[package]] diff --git a/crates/cli/Cargo.toml b/crates/cli/Cargo.toml index 13c7c012..32cbd88f 100644 --- a/crates/cli/Cargo.toml +++ b/crates/cli/Cargo.toml @@ -35,6 +35,7 @@ serde_with = { workspace = true, features = ["base64"] } rand.workspace = true tempfile.workspace = true reqwest.workspace = true +url.workspace = true [dev-dependencies] tempfile.workspace = true diff --git a/crates/cli/src/commands/test/mev.rs b/crates/cli/src/commands/test/mev.rs index 473bbd9b..ce8fb2f5 100644 --- a/crates/cli/src/commands/test/mev.rs +++ b/crates/cli/src/commands/test/mev.rs @@ -1,9 +1,42 @@ //! MEV relay tests. -use super::{TestCategoryResult, TestConfigArgs}; -use crate::error::Result; +use std::{ + collections::HashMap, + io::Write, + time::{Duration, Instant}, +}; + +use reqwest::{Method, StatusCode}; +use tokio::sync::mpsc; +use tokio_util::sync::CancellationToken; +use tracing::info; + +use super::{ + AllCategoriesResult, SLOT_TIME, SLOTS_IN_EPOCH, TestCaseName, TestCategory, TestCategoryResult, + TestConfigArgs, TestResult, TestResultError, TestVerdict, calculate_score, evaluate_rtt, + filter_tests, must_output_to_file_on_quiet, publish_result_to_obol_api, request_rtt, + sort_tests, write_result_to_file, write_result_to_writer, +}; +use crate::{ + duration::Duration as CliDuration, + error::{CliError, Result}, +}; use clap::Args; -use std::io::Write; + +/// MEV-specific errors. +#[derive(Debug, thiserror::Error)] +enum MevError { + /// Relay returned non-200 for the header request. + #[error("status code not 200 OK")] + StatusCodeNot200, + #[error(transparent)] + Cli(#[from] CliError), +} + +/// Thresholds for MEV ping measure test. +const THRESHOLD_MEV_MEASURE_AVG: Duration = Duration::from_millis(40); +/// Threshold for poor MEV ping measure. +const THRESHOLD_MEV_MEASURE_POOR: Duration = Duration::from_millis(100); /// Arguments for the MEV test command. #[derive(Args, Clone, Debug)] @@ -38,13 +71,880 @@ pub struct TestMevArgs { help = "Increases the accuracy of the load test by asking for multiple payloads. Increases test duration." )] pub number_of_payloads: u32, + + /// X-Timeout-Ms header flag for each request in milliseconds. + #[arg( + long = "x-timeout-ms", + default_value = "1000", + help = "X-Timeout-Ms header flag for each request in milliseconds, used by MEVs to compute maximum delay for reply." + )] + pub x_timeout_ms: u32, +} + +/// A MEV test case function. +type TestCaseMev = + for<'a> fn( + token: CancellationToken, + conf: &'a TestMevArgs, + target: &'a str, + ) -> std::pin::Pin + Send + 'a>>; + +/// Returns the supported MEV test cases. +fn supported_mev_test_cases() -> HashMap { + HashMap::from([ + (TestCaseName::new("Ping", 1), mev_ping_test as TestCaseMev), + ( + TestCaseName::new("PingMeasure", 2), + mev_ping_measure_test as TestCaseMev, + ), + ( + TestCaseName::new("CreateBlock", 3), + mev_create_block_test as TestCaseMev, + ), + ]) } /// Runs the MEV relay tests. -pub async fn run(_args: TestMevArgs, _writer: &mut dyn Write) -> Result { - // TODO: Implement MEV tests - // - Ping - // - PingMeasure - // - CreateBlock - unimplemented!("mev test not yet implemented") +pub async fn run(args: TestMevArgs, writer: &mut dyn Write) -> Result { + must_output_to_file_on_quiet(args.test_config.quiet, &args.test_config.output_json)?; + + // Validate flag combinations. + if args.load_test && args.beacon_node_endpoint.is_none() { + return Err(CliError::Other( + "beacon-node-endpoint required when load-test enabled".to_string(), + )); + } + if !args.load_test && args.beacon_node_endpoint.is_some() { + return Err(CliError::Other( + "beacon-node-endpoint only supported when load-test enabled".to_string(), + )); + } + + info!("Starting MEV relays test"); + + let test_cases = supported_mev_test_cases(); + let queued_tests = filter_tests(&test_cases, args.test_config.test_cases.as_deref()); + if queued_tests.is_empty() { + return Err(CliError::Other("test case not supported".to_string())); + } + let mut queued_tests = queued_tests; + sort_tests(&mut queued_tests); + + let token = CancellationToken::new(); + let timeout_token = token.clone(); + tokio::spawn(async move { + tokio::time::sleep(args.test_config.timeout).await; + timeout_token.cancel(); + }); + + let start_time = Instant::now(); + let test_results = test_all_mevs(&queued_tests, &test_cases, &args, token).await; + let exec_time = CliDuration::new(start_time.elapsed()); + + let score = test_results + .values() + .map(|results| calculate_score(results)) + .min(); + + let res = TestCategoryResult { + category_name: Some(TestCategory::Mev), + targets: test_results, + execution_time: Some(exec_time), + score, + }; + + if !args.test_config.quiet { + write_result_to_writer(&res, writer)?; + } + + if !args.test_config.output_json.is_empty() { + write_result_to_file(&res, args.test_config.output_json.as_ref()).await?; + } + + if args.test_config.publish { + publish_result_to_obol_api( + AllCategoriesResult { + mev: Some(res.clone()), + ..Default::default() + }, + &args.test_config.publish_addr, + &args.test_config.publish_private_key_file, + ) + .await?; + } + + Ok(res) +} + +async fn test_all_mevs( + queued_tests: &[TestCaseName], + test_cases: &HashMap, + conf: &TestMevArgs, + token: CancellationToken, +) -> HashMap> { + let (tx, mut rx) = mpsc::channel::<(String, Vec)>(conf.endpoints.len()); + + for endpoint in &conf.endpoints { + let tx = tx.clone(); + let queued_tests = queued_tests.to_vec(); + + let test_cases = test_cases.clone(); + let endpoint = endpoint.clone(); + let conf = conf.clone(); + + let child_token = token.child_token(); + + tokio::spawn(async move { + let results = + test_single_mev(&queued_tests, &test_cases, &conf, &endpoint, child_token).await; + let relay_name = format_mev_relay_name(&endpoint); + let _ = tx.send((relay_name, results)).await; + }); + } + + drop(tx); + + let mut all_results = HashMap::new(); + while let Some((name, results)) = rx.recv().await { + all_results.insert(name, results); + } + + all_results +} + +async fn test_single_mev( + queued_tests: &[TestCaseName], + test_cases: &HashMap, + conf: &TestMevArgs, + target: &str, + token: CancellationToken, +) -> Vec { + let (result_tx, mut result_rx) = mpsc::channel::(queued_tests.len()); + + let queued = queued_tests.to_vec(); + let test_cases = test_cases.clone(); + let conf_clone = conf.clone(); + let target_owned = target.to_string(); + + let runner_token = token.child_token(); + + tokio::spawn(async move { + for t in &queued { + if runner_token.is_cancelled() { + return; + } + if let Some(test_fn) = test_cases.get(t) { + let result = test_fn(runner_token.clone(), &conf_clone, &target_owned).await; + if result_tx.send(result).await.is_err() { + return; + } + } + } + }); + + let mut all_results = Vec::new(); + let mut test_counter = 0usize; + + loop { + tokio::select! { + + _ = token.cancelled() => { + if test_counter < queued_tests.len() { + all_results.push(TestResult { + name: queued_tests[test_counter].name.clone(), + verdict: TestVerdict::Fail, + error: TestResultError::from_string("timeout/interrupted"), + ..TestResult::new("") + }); + } + break; + } + result = result_rx.recv() => { + match result { + Some(r) => { + test_counter = test_counter.saturating_add(1); + all_results.push(r); + } + None => break, + } + } + } + } + + all_results +} + +fn mev_ping_test<'a>( + token: CancellationToken, + _conf: &'a TestMevArgs, + target: &'a str, +) -> std::pin::Pin + Send + 'a>> { + Box::pin(async move { + let test_res = TestResult::new("Ping"); + let url = format!("{target}/eth/v1/builder/status"); + let client = reqwest::Client::new(); + + let (clean_url, creds) = match parse_endpoint_credentials(&url) { + Ok(v) => v, + Err(e) => return test_res.fail(e), + }; + + let resp = tokio::select! { + _ = token.cancelled() => return test_res.fail(CliError::Other("timeout/interrupted".to_string())), + r = apply_basic_auth(client.get(&clean_url), creds).send() => match r { + Ok(r) => r, + Err(e) => return test_res.fail(e), + } + }; + + if resp.status().as_u16() > 399 { + return test_res.fail(CliError::Other(http_status_error(resp.status()))); + } + + test_res.ok() + }) +} + +fn mev_ping_measure_test<'a>( + token: CancellationToken, + _conf: &'a TestMevArgs, + target: &'a str, +) -> std::pin::Pin + Send + 'a>> { + Box::pin(async move { + let test_res = TestResult::new("PingMeasure"); + let url = format!("{target}/eth/v1/builder/status"); + + let rtt = tokio::select! { + _ = token.cancelled() => return test_res.fail(CliError::Other("timeout/interrupted".to_string())), + r = request_rtt(&url, Method::GET, None, StatusCode::OK) => match r { + Ok(r) => r, + Err(e) => return test_res.fail(e), + } + }; + + evaluate_rtt( + rtt, + test_res, + THRESHOLD_MEV_MEASURE_AVG, + THRESHOLD_MEV_MEASURE_POOR, + ) + }) +} + +fn mev_create_block_test<'a>( + token: CancellationToken, + conf: &'a TestMevArgs, + target: &'a str, +) -> std::pin::Pin + Send + 'a>> { + Box::pin(async move { + let test_res = TestResult::new("CreateBlock"); + + if !conf.load_test { + return TestResult { + verdict: TestVerdict::Skip, + ..test_res + }; + } + + let beacon_endpoint = match &conf.beacon_node_endpoint { + Some(ep) => ep.as_str(), + None => { + return test_res.fail(CliError::Other("beacon-node-endpoint required".to_string())); + } + }; + + let latest_block = match latest_beacon_block(beacon_endpoint, &token).await { + Ok(b) => b, + Err(e) => return test_res.fail(e), + }; + + let latest_block_ts_unix: i64 = match latest_block.body.execution_payload.timestamp.parse() + { + Ok(v) => v, + Err(e) => return test_res.fail(CliError::Other(format!("parse timestamp: {e}"))), + }; + + let latest_block_ts = std::time::UNIX_EPOCH + .checked_add(Duration::from_secs(latest_block_ts_unix.unsigned_abs())) + .unwrap_or(std::time::UNIX_EPOCH); + let next_block_ts = latest_block_ts + .checked_add(SLOT_TIME) + .unwrap_or(latest_block_ts); + + if let Ok(remaining) = next_block_ts.duration_since(std::time::SystemTime::now()) { + tokio::select! { + _ = token.cancelled() => return test_res.fail(CliError::Other("timeout/interrupted".to_string())), + _ = tokio::time::sleep(remaining) => {} + } + } + + let latest_slot: i64 = match latest_block.slot.parse() { + Ok(v) => v, + Err(e) => return test_res.fail(CliError::Other(format!("parse slot: {e}"))), + }; + + let mut next_slot = latest_slot.saturating_add(1); + let slots_in_epoch_i64 = i64::try_from(SLOTS_IN_EPOCH).unwrap_or(i64::MAX); + let epoch = next_slot.checked_div(slots_in_epoch_i64).unwrap_or(0); + + let mut proposer_duties = + match fetch_proposers_for_epoch(beacon_endpoint, epoch, &token).await { + Ok(d) => d, + Err(e) => return test_res.fail(e), + }; + + let mut all_blocks_rtt: Vec = Vec::new(); + let x_timeout_ms = conf.x_timeout_ms; + + info!( + mev_relay = target, + blocks = conf.number_of_payloads, + x_timeout_ms = x_timeout_ms, + "Starting attempts for block creation" + ); + + let mut latest_block = latest_block; + + loop { + if token.is_cancelled() { + break; + } + + let start_iteration = Instant::now(); + + let rtt = match create_mev_block( + conf, + target, + x_timeout_ms, + next_slot, + &mut latest_block, + &mut proposer_duties, + beacon_endpoint, + &token, + ) + .await + { + Ok(r) => r, + Err(e) => return test_res.fail(e), + }; + + all_blocks_rtt.push(rtt); + if all_blocks_rtt.len() + == usize::try_from(conf.number_of_payloads).unwrap_or(usize::MAX) + { + break; + } + + let elapsed = start_iteration.elapsed(); + let elapsed_nanos = u64::try_from(elapsed.as_nanos()).unwrap_or(u64::MAX); + let slot_nanos = u64::try_from(SLOT_TIME.as_nanos()).unwrap_or(1); + let remainder_nanos = elapsed_nanos.checked_rem(slot_nanos).unwrap_or(0); + let slot_remainder = SLOT_TIME + .checked_sub(Duration::from_nanos(remainder_nanos)) + .unwrap_or_default(); + if let Some(sleep_dur) = slot_remainder.checked_sub(Duration::from_secs(1)) { + tokio::select! { + _ = token.cancelled() => break, + _ = tokio::time::sleep(sleep_dur) => {} + } + } + + let start_beacon_fetch = Instant::now(); + latest_block = match latest_beacon_block(beacon_endpoint, &token).await { + Ok(b) => b, + Err(e) => return test_res.fail(e), + }; + + let latest_slot_parsed: i64 = match latest_block.slot.parse() { + Ok(v) => v, + Err(e) => return test_res.fail(CliError::Other(format!("parse slot: {e}"))), + }; + + next_slot = latest_slot_parsed.saturating_add(1); + + // Wait 1 second minus how long the fetch took. + if let Some(sleep_dur) = + Duration::from_secs(1).checked_sub(start_beacon_fetch.elapsed()) + { + tokio::select! { + _ = token.cancelled() => break, + _ = tokio::time::sleep(sleep_dur) => {} + } + } + } + + if all_blocks_rtt.is_empty() { + return test_res.fail(CliError::Other("timeout/interrupted".to_string())); + } + + let total_rtt: Duration = all_blocks_rtt.iter().sum(); + let count = u32::try_from(all_blocks_rtt.len().max(1)).unwrap_or(u32::MAX); + let average_rtt = total_rtt.checked_div(count).unwrap_or_default(); + + let avg_threshold = Duration::from_millis( + u64::from(x_timeout_ms) + .saturating_mul(9) + .checked_div(10) + .unwrap_or(0), + ); + let poor_threshold = Duration::from_millis(u64::from(x_timeout_ms)); + + evaluate_rtt(average_rtt, test_res, avg_threshold, poor_threshold) + }) +} + +// Helper types +#[derive(Debug, Clone, serde::Deserialize)] +struct BeaconBlock { + data: BeaconBlockData, +} + +#[derive(Debug, Clone, serde::Deserialize)] +struct BeaconBlockData { + message: BeaconBlockMessage, +} + +#[derive(Debug, Clone, serde::Deserialize)] +struct BeaconBlockMessage { + slot: String, + body: BeaconBlockBody, +} + +#[derive(Debug, Clone, serde::Deserialize)] +struct BeaconBlockBody { + execution_payload: BeaconBlockExecPayload, +} + +#[derive(Debug, Clone, serde::Deserialize)] +struct BeaconBlockExecPayload { + block_hash: String, + timestamp: String, +} + +#[derive(Debug, Clone, serde::Deserialize)] +struct ProposerDuties { + data: Vec, +} + +#[derive(Debug, Clone, serde::Deserialize)] +struct ProposerDutiesData { + pubkey: String, + slot: String, +} + +#[derive(Debug, Clone, serde::Deserialize)] +struct BuilderBidResponse { + version: String, + data: serde_json::Value, +} + +async fn latest_beacon_block( + endpoint: &str, + token: &CancellationToken, +) -> Result { + let url = format!("{endpoint}/eth/v2/beacon/blocks/head"); + let (clean_url, creds) = parse_endpoint_credentials(&url)?; + let client = reqwest::Client::new(); + + let resp = tokio::select! { + _ = token.cancelled() => return Err(CliError::Other("timeout/interrupted".to_string())), + r = apply_basic_auth(client.get(&clean_url), creds).send() => { + r.map_err(|e| CliError::Other(format!("http request do: {e}")))? + } + }; + + let body = resp + .bytes() + .await + .map_err(|e| CliError::Other(format!("http response body: {e}")))?; + + let block: BeaconBlock = serde_json::from_slice(&body) + .map_err(|e| CliError::Other(format!("http response json: {e}")))?; + + Ok(block.data.message) +} + +async fn fetch_proposers_for_epoch( + beacon_endpoint: &str, + epoch: i64, + token: &CancellationToken, +) -> Result> { + let url = format!("{beacon_endpoint}/eth/v1/validator/duties/proposer/{epoch}"); + let (clean_url, creds) = parse_endpoint_credentials(&url)?; + let client = reqwest::Client::new(); + + let resp = tokio::select! { + _ = token.cancelled() => return Err(CliError::Other("timeout/interrupted".to_string())), + r = apply_basic_auth(client.get(&clean_url), creds).send() => { + r.map_err(|e| CliError::Other(format!("http request do: {e}")))? + } + }; + + let body = resp + .bytes() + .await + .map_err(|e| CliError::Other(format!("http response body: {e}")))?; + + let duties: ProposerDuties = serde_json::from_slice(&body) + .map_err(|e| CliError::Other(format!("http response json: {e}")))?; + + Ok(duties.data) +} + +fn get_validator_pk_for_slot(proposers: &[ProposerDutiesData], slot: i64) -> Option { + let slot_str = slot.to_string(); + proposers + .iter() + .find(|p| p.slot == slot_str) + .map(|p| p.pubkey.clone()) +} + +async fn get_block_header( + target: &str, + x_timeout_ms: u32, + next_slot: i64, + block_hash: &str, + validator_pub_key: &str, + token: &CancellationToken, +) -> std::result::Result<(BuilderBidResponse, Duration), MevError> { + let url = + format!("{target}/eth/v1/builder/header/{next_slot}/{block_hash}/{validator_pub_key}"); + + let (clean_url, creds) = parse_endpoint_credentials(&url) + .map_err(|e| MevError::Cli(CliError::Other(format!("parse url: {e}"))))?; + + let client = reqwest::Client::new(); + let start = Instant::now(); + + let resp = tokio::select! { + _ = token.cancelled() => { + return Err(MevError::Cli(CliError::Other("timeout/interrupted".to_string()))); + } + r = apply_basic_auth(client.get(&clean_url), creds) + .header("X-Timeout-Ms", x_timeout_ms.to_string()) + .header( + "Date-Milliseconds", + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() + .to_string(), + ) + .send() => { + r.map_err(|e| MevError::Cli(CliError::Other(format!("http request rtt: {e}"))))? + } + }; + + let rtt = start.elapsed(); + + if resp.status() != StatusCode::OK { + return Err(MevError::StatusCodeNot200); + } + + let body = resp + .bytes() + .await + .map_err(|e| MevError::Cli(CliError::Other(format!("http response body: {e}"))))?; + + let bid: BuilderBidResponse = serde_json::from_slice(&body) + .map_err(|e| MevError::Cli(CliError::Other(format!("http response json: {e}"))))?; + + Ok((bid, rtt)) +} + +async fn create_mev_block( + _conf: &TestMevArgs, + target: &str, + x_timeout_ms: u32, + mut next_slot: i64, + latest_block: &mut BeaconBlockMessage, + proposer_duties: &mut Vec, + beacon_endpoint: &str, + token: &CancellationToken, +) -> Result { + let rtt_get_header; + let builder_bid; + + loop { + if token.is_cancelled() { + return Err(CliError::Other("timeout/interrupted".to_string())); + } + + let start_iteration = Instant::now(); + let slots_in_epoch_i64 = i64::try_from(SLOTS_IN_EPOCH).unwrap_or(i64::MAX); + let epoch = next_slot.checked_div(slots_in_epoch_i64).unwrap_or(0); + + let pk = if let Some(pk) = get_validator_pk_for_slot(proposer_duties, next_slot) { + pk + } else { + *proposer_duties = fetch_proposers_for_epoch(beacon_endpoint, epoch, token).await?; + get_validator_pk_for_slot(proposer_duties, next_slot) + .ok_or_else(|| CliError::Other("slot not found".to_string()))? + }; + + match get_block_header( + target, + x_timeout_ms, + next_slot, + &latest_block.body.execution_payload.block_hash, + &pk, + token, + ) + .await + { + Ok((bid, rtt)) => { + builder_bid = bid; + rtt_get_header = rtt; + + info!( + slot = next_slot, + target = target, + "Created block headers for slot" + ); + break; + } + + Err(MevError::StatusCodeNot200) => { + let elapsed = start_iteration.elapsed(); + if let Some(sleep_dur) = SLOT_TIME.checked_sub(elapsed) + && let Some(sleep_dur) = sleep_dur.checked_sub(Duration::from_secs(1)) + { + tokio::select! { + _ = token.cancelled() => { + return Err(CliError::Other("timeout/interrupted".to_string())); + } + _ = tokio::time::sleep(sleep_dur) => {} + } + } + + let start_beacon_fetch = Instant::now(); + *latest_block = latest_beacon_block(beacon_endpoint, token).await?; + next_slot = next_slot.saturating_add(1); + + if let Some(sleep_dur) = + Duration::from_secs(1).checked_sub(start_beacon_fetch.elapsed()) + { + tokio::select! { + _ = token.cancelled() => { + return Err(CliError::Other("timeout/interrupted".to_string())); + } + _ = tokio::time::sleep(sleep_dur) => {} + } + } + + continue; + } + Err(MevError::Cli(e)) => return Err(e), + } + } + + let payload = build_blinded_block_payload(&builder_bid)?; + let payload_json = serde_json::to_vec(&payload).map_err(|e| { + CliError::Other(format!( + "signed blinded beacon block json payload marshal: {e}" + )) + })?; + + let rtt_submit_block = tokio::select! { + _ = token.cancelled() => return Err(CliError::Other("timeout/interrupted".to_string())), + r = request_rtt( + format!("{target}/eth/v1/builder/blinded_blocks"), + Method::POST, + Some(payload_json), + StatusCode::BAD_REQUEST, + ) => r? + }; + + Ok(rtt_get_header + .checked_add(rtt_submit_block) + .unwrap_or(rtt_get_header)) +} + +fn build_blinded_block_payload(bid: &BuilderBidResponse) -> Result { + let sig_hex = "0xb9251a82040d4620b8c5665f328ee6c2eaa02d31d71d153f4abba31a7922a981e541e85283f0ced387d26e86aef9386d18c6982b9b5f8759882fe7f25a328180d86e146994ef19d28bc1432baf29751dec12b5f3d65dbbe224d72cf900c6831a"; + + let header = extract_execution_payload_header(&bid.data, &bid.version)?; + + let zero_hash = "0x0000000000000000000000000000000000000000000000000000000000000000"; + let zero_sig = "0x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"; + + let mut body = serde_json::json!({ + "randao_reveal": zero_sig, + "eth1_data": { + "deposit_root": zero_hash, + "deposit_count": "0", + "block_hash": zero_hash + }, + "graffiti": zero_hash, + "proposer_slashings": [], + "attester_slashings": [], + "attestations": [], + "deposits": [], + "voluntary_exits": [], + "sync_aggregate": { + "sync_committee_bits": "0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", + "sync_committee_signature": zero_sig + }, + "execution_payload_header": header + }); + + let version_lower = bid.version.to_lowercase(); + + if matches!( + version_lower.as_str(), + "capella" | "deneb" | "electra" | "fulu" + ) { + body["bls_to_execution_changes"] = serde_json::json!([]); + } + + if matches!(version_lower.as_str(), "deneb" | "electra" | "fulu") { + body["blob_kzg_commitments"] = serde_json::json!([]); + } + + if matches!(version_lower.as_str(), "electra" | "fulu") { + body["execution_requests"] = serde_json::json!({ + "deposits": [], + "withdrawals": [], + "consolidations": [] + }); + } + + Ok(serde_json::json!({ + "message": { + "slot": "0", + "proposer_index": "0", + "parent_root": zero_hash, + "state_root": zero_hash, + "body": body + }, + "signature": sig_hex + })) +} + +fn extract_execution_payload_header( + data: &serde_json::Value, + version: &str, +) -> Result { + data.get("message") + .and_then(|m| m.get("header")) + .cloned() + .ok_or_else(|| { + CliError::Other(format!( + "not supported version or missing header: {version}" + )) + }) +} + +fn parse_endpoint_credentials(raw_url: &str) -> Result<(String, Option<(String, String)>)> { + let parsed = + url::Url::parse(raw_url).map_err(|e| CliError::Other(format!("parse url: {e}")))?; + + let creds = if !parsed.username().is_empty() { + Some(( + parsed.username().to_string(), + parsed.password().unwrap_or("").to_string(), + )) + } else { + None + }; + + let mut clean = parsed.clone(); + clean + .set_username("") + .map_err(|()| CliError::Other("set username on URL".to_string()))?; + clean + .set_password(None) + .map_err(|()| CliError::Other("set password on URL".to_string()))?; + + Ok((clean.to_string(), creds)) +} + +fn apply_basic_auth( + builder: reqwest::RequestBuilder, + creds: Option<(String, String)>, +) -> reqwest::RequestBuilder { + match creds { + Some((user, pass)) => builder.basic_auth(user, Some(pass)), + None => builder, + } +} + +fn format_mev_relay_name(url_string: &str) -> String { + let Some((scheme, rest)) = url_string.split_once("://") else { + return url_string.to_string(); + }; + + let Some((hash, host)) = rest.split_once('@') else { + return url_string.to_string(); + }; + + if !hash.starts_with("0x") || hash.len() < 18 { + return url_string.to_string(); + } + + let hash_short = format!("{}...{}", &hash[..6], &hash[hash.len().saturating_sub(4)..]); + format!("{scheme}://{hash_short}@{host}") +} + +fn http_status_error(status: StatusCode) -> String { + format!("status code {}", status.as_u16()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_format_mev_relay_name() { + assert_eq!( + format_mev_relay_name( + "https://0xac6e77dfe25ecd6110b8e780608cce0dab71fdd5ebea22a16c0205200f2f8e2e3ad3b71d3499c54ad14d6c21b41a37ae@boost-relay.flashbots.net" + ), + "https://0xac6e...37ae@boost-relay.flashbots.net" + ); + + assert_eq!( + format_mev_relay_name("boost-relay.flashbots.net"), + "boost-relay.flashbots.net" + ); + + assert_eq!( + format_mev_relay_name("https://boost-relay.flashbots.net"), + "https://boost-relay.flashbots.net" + ); + + assert_eq!( + format_mev_relay_name("https://0xshort@boost-relay.flashbots.net"), + "https://0xshort@boost-relay.flashbots.net" + ); + + assert_eq!( + format_mev_relay_name("https://noprefixhashvalue1234567890@boost-relay.flashbots.net"), + "https://noprefixhashvalue1234567890@boost-relay.flashbots.net" + ); + } + + #[test] + fn test_get_validator_pk_for_slot() { + let duties = vec![ + ProposerDutiesData { + pubkey: "0xabc".to_string(), + slot: "100".to_string(), + }, + ProposerDutiesData { + pubkey: "0xdef".to_string(), + slot: "101".to_string(), + }, + ]; + + assert_eq!( + get_validator_pk_for_slot(&duties, 100), + Some("0xabc".to_string()) + ); + assert_eq!( + get_validator_pk_for_slot(&duties, 101), + Some("0xdef".to_string()) + ); + assert_eq!(get_validator_pk_for_slot(&duties, 102), None); + } } diff --git a/crates/cli/src/commands/test/mod.rs b/crates/cli/src/commands/test/mod.rs index 774b361d..a6df9b2c 100644 --- a/crates/cli/src/commands/test/mod.rs +++ b/crates/cli/src/commands/test/mod.rs @@ -221,6 +221,10 @@ impl TestResultError { Self(String::new()) } + pub(crate) fn from_string(s: impl Into) -> Self { + Self(s.into()) + } + pub(crate) fn is_empty(&self) -> bool { self.0.is_empty() } From 1583cc924e8586915896d4253c33b91b765a79c6 Mon Sep 17 00:00:00 2001 From: PoulavBhowmick03 Date: Sat, 28 Mar 2026 13:11:13 +0530 Subject: [PATCH 02/10] clippy --- crates/cli/src/commands/test/mev.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/cli/src/commands/test/mev.rs b/crates/cli/src/commands/test/mev.rs index ce8fb2f5..d0366516 100644 --- a/crates/cli/src/commands/test/mev.rs +++ b/crates/cli/src/commands/test/mev.rs @@ -652,6 +652,7 @@ async fn get_block_header( Ok((bid, rtt)) } +#[allow(clippy::too_many_arguments)] async fn create_mev_block( _conf: &TestMevArgs, target: &str, From e7f5fef48d737049ff4476a1e48d7f1600b3396e Mon Sep 17 00:00:00 2001 From: Lautaro Emanuel Date: Fri, 3 Apr 2026 17:08:50 -0300 Subject: [PATCH 03/10] Remove type aliases for test cases - Follow the `test validator` approach (enum + impl block) - Simplify error handling for timeouts --- crates/cli/src/commands/test/mev.rs | 499 +++++++++++++--------------- crates/cli/src/commands/test/mod.rs | 17 - 2 files changed, 229 insertions(+), 287 deletions(-) diff --git a/crates/cli/src/commands/test/mev.rs b/crates/cli/src/commands/test/mev.rs index d0366516..f085a76e 100644 --- a/crates/cli/src/commands/test/mev.rs +++ b/crates/cli/src/commands/test/mev.rs @@ -7,17 +7,18 @@ use std::{ }; use reqwest::{Method, StatusCode}; -use tokio::sync::mpsc; +use tokio::{sync::mpsc, task::JoinSet}; use tokio_util::sync::CancellationToken; use tracing::info; use super::{ - AllCategoriesResult, SLOT_TIME, SLOTS_IN_EPOCH, TestCaseName, TestCategory, TestCategoryResult, + AllCategoriesResult, SLOT_TIME, SLOTS_IN_EPOCH, TestCategory, TestCategoryResult, TestConfigArgs, TestResult, TestResultError, TestVerdict, calculate_score, evaluate_rtt, - filter_tests, must_output_to_file_on_quiet, publish_result_to_obol_api, request_rtt, - sort_tests, write_result_to_file, write_result_to_writer, + must_output_to_file_on_quiet, publish_result_to_obol_api, request_rtt, write_result_to_file, + write_result_to_writer, }; use crate::{ + commands::test::TestCaseName, duration::Duration as CliDuration, error::{CliError, Result}, }; @@ -81,27 +82,33 @@ pub struct TestMevArgs { pub x_timeout_ms: u32, } -/// A MEV test case function. -type TestCaseMev = - for<'a> fn( - token: CancellationToken, - conf: &'a TestMevArgs, - target: &'a str, - ) -> std::pin::Pin + Send + 'a>>; - -/// Returns the supported MEV test cases. -fn supported_mev_test_cases() -> HashMap { - HashMap::from([ - (TestCaseName::new("Ping", 1), mev_ping_test as TestCaseMev), - ( - TestCaseName::new("PingMeasure", 2), - mev_ping_measure_test as TestCaseMev, - ), - ( - TestCaseName::new("CreateBlock", 3), - mev_create_block_test as TestCaseMev, - ), - ]) +#[derive(Debug, Clone)] +enum TestCaseMev { + Ping, + PingMeasure, + CreateBlock, +} + +impl TestCaseMev { + fn all() -> Vec { + vec![Self::Ping, Self::PingMeasure, Self::CreateBlock] + } + + fn test_case_name(&self) -> TestCaseName { + match self { + TestCaseMev::Ping => TestCaseName::new("Ping", 1), + TestCaseMev::PingMeasure => TestCaseName::new("PingMeasure", 2), + TestCaseMev::CreateBlock => TestCaseName::new("CreateBlock", 3), + } + } + + async fn run(&self, token: &CancellationToken, conf: &TestMevArgs, target: &str) -> TestResult { + match self { + TestCaseMev::Ping => mev_ping_test(token, conf, target).await, + TestCaseMev::PingMeasure => mev_ping_measure_test(token, conf, target).await, + TestCaseMev::CreateBlock => mev_create_block_test(token, conf, target).await, + } + } } /// Runs the MEV relay tests. @@ -122,13 +129,16 @@ pub async fn run(args: TestMevArgs, writer: &mut dyn Write) -> Result Result Result, + queued_tests: &[TestCaseMev], conf: &TestMevArgs, token: CancellationToken, ) -> HashMap> { let (tx, mut rx) = mpsc::channel::<(String, Vec)>(conf.endpoints.len()); for endpoint in &conf.endpoints { - let tx = tx.clone(); let queued_tests = queued_tests.to_vec(); - - let test_cases = test_cases.clone(); - let endpoint = endpoint.clone(); let conf = conf.clone(); - - let child_token = token.child_token(); + let endpoint = endpoint.clone(); + let token = token.clone(); + let tx = tx.clone(); tokio::spawn(async move { - let results = - test_single_mev(&queued_tests, &test_cases, &conf, &endpoint, child_token).await; + let results = test_single_mev(&queued_tests, &conf, &endpoint, token).await; let relay_name = format_mev_relay_name(&endpoint); let _ = tx.send((relay_name, results)).await; }); @@ -213,284 +218,238 @@ async fn test_all_mevs( } async fn test_single_mev( - queued_tests: &[TestCaseName], - test_cases: &HashMap, + queued_tests: &[TestCaseMev], conf: &TestMevArgs, target: &str, token: CancellationToken, ) -> Vec { - let (result_tx, mut result_rx) = mpsc::channel::(queued_tests.len()); - - let queued = queued_tests.to_vec(); - let test_cases = test_cases.clone(); - let conf_clone = conf.clone(); - let target_owned = target.to_string(); + let runner_queued_tests = queued_tests.to_vec(); - let runner_token = token.child_token(); + let mut join_set = JoinSet::new(); + for test_case in runner_queued_tests { + let runner_token = token.clone(); + let conf = conf.clone(); + let target = target.to_string(); - tokio::spawn(async move { - for t in &queued { - if runner_token.is_cancelled() { - return; - } - if let Some(test_fn) = test_cases.get(t) { - let result = test_fn(runner_token.clone(), &conf_clone, &target_owned).await; - if result_tx.send(result).await.is_err() { - return; + join_set.spawn(async move { + let tc_name = test_case.test_case_name(); + tokio::select! { + _ = runner_token.cancelled() => { + let tr = TestResult::new(&tc_name.name); + tr.fail(TestResultError::from_string("timeout/interrupted")) + } + r = test_case.run(&runner_token, &conf, &target) => { + r } } - } - }); + }); + } - let mut all_results = Vec::new(); - let mut test_counter = 0usize; + join_set.join_all().await +} - loop { - tokio::select! { +async fn mev_ping_test(token: &CancellationToken, _conf: &TestMevArgs, target: &str) -> TestResult { + let test_res = TestResult::new("Ping"); + let url = format!("{target}/eth/v1/builder/status"); + let client = reqwest::Client::new(); - _ = token.cancelled() => { - if test_counter < queued_tests.len() { - all_results.push(TestResult { - name: queued_tests[test_counter].name.clone(), - verdict: TestVerdict::Fail, - error: TestResultError::from_string("timeout/interrupted"), - ..TestResult::new("") - }); - } - break; - } - result = result_rx.recv() => { - match result { - Some(r) => { - test_counter = test_counter.saturating_add(1); - all_results.push(r); - } - None => break, - } - } + let (clean_url, creds) = match parse_endpoint_credentials(&url) { + Ok(v) => v, + Err(e) => return test_res.fail(e), + }; + + let resp = tokio::select! { + _ = token.cancelled() => return test_res.fail(CliError::Other("timeout/interrupted".to_string())), + r = apply_basic_auth(client.get(&clean_url), creds).send() => match r { + Ok(r) => r, + Err(e) => return test_res.fail(e), } + }; + + if resp.status().as_u16() > 399 { + return test_res.fail(CliError::Other(http_status_error(resp.status()))); } - all_results + test_res.ok() } -fn mev_ping_test<'a>( - token: CancellationToken, - _conf: &'a TestMevArgs, - target: &'a str, -) -> std::pin::Pin + Send + 'a>> { - Box::pin(async move { - let test_res = TestResult::new("Ping"); - let url = format!("{target}/eth/v1/builder/status"); - let client = reqwest::Client::new(); - - let (clean_url, creds) = match parse_endpoint_credentials(&url) { - Ok(v) => v, +async fn mev_ping_measure_test( + token: &CancellationToken, + _conf: &TestMevArgs, + target: &str, +) -> TestResult { + let test_res = TestResult::new("PingMeasure"); + let url = format!("{target}/eth/v1/builder/status"); + + let rtt = tokio::select! { + _ = token.cancelled() => return test_res.fail(CliError::Other("timeout/interrupted".to_string())), + r = request_rtt(&url, Method::GET, None, StatusCode::OK) => match r { + Ok(r) => r, Err(e) => return test_res.fail(e), - }; + } + }; - let resp = tokio::select! { - _ = token.cancelled() => return test_res.fail(CliError::Other("timeout/interrupted".to_string())), - r = apply_basic_auth(client.get(&clean_url), creds).send() => match r { - Ok(r) => r, - Err(e) => return test_res.fail(e), - } + evaluate_rtt( + rtt, + test_res, + THRESHOLD_MEV_MEASURE_AVG, + THRESHOLD_MEV_MEASURE_POOR, + ) +} + +async fn mev_create_block_test( + token: &CancellationToken, + conf: &TestMevArgs, + target: &str, +) -> TestResult { + let test_res = TestResult::new("CreateBlock"); + + if !conf.load_test { + return TestResult { + verdict: TestVerdict::Skip, + ..test_res }; + } - if resp.status().as_u16() > 399 { - return test_res.fail(CliError::Other(http_status_error(resp.status()))); + let beacon_endpoint = match &conf.beacon_node_endpoint { + Some(ep) => ep.as_str(), + None => { + return test_res.fail(CliError::Other("beacon-node-endpoint required".to_string())); } + }; - test_res.ok() - }) -} + let latest_block = match latest_beacon_block(beacon_endpoint, &token).await { + Ok(b) => b, + Err(e) => return test_res.fail(e), + }; -fn mev_ping_measure_test<'a>( - token: CancellationToken, - _conf: &'a TestMevArgs, - target: &'a str, -) -> std::pin::Pin + Send + 'a>> { - Box::pin(async move { - let test_res = TestResult::new("PingMeasure"); - let url = format!("{target}/eth/v1/builder/status"); - - let rtt = tokio::select! { - _ = token.cancelled() => return test_res.fail(CliError::Other("timeout/interrupted".to_string())), - r = request_rtt(&url, Method::GET, None, StatusCode::OK) => match r { - Ok(r) => r, - Err(e) => return test_res.fail(e), - } - }; + let latest_block_ts_unix: i64 = match latest_block.body.execution_payload.timestamp.parse() { + Ok(v) => v, + Err(e) => return test_res.fail(CliError::Other(format!("parse timestamp: {e}"))), + }; - evaluate_rtt( - rtt, - test_res, - THRESHOLD_MEV_MEASURE_AVG, - THRESHOLD_MEV_MEASURE_POOR, - ) - }) -} + let latest_block_ts = std::time::UNIX_EPOCH + .checked_add(Duration::from_secs(latest_block_ts_unix.unsigned_abs())) + .unwrap_or(std::time::UNIX_EPOCH); + let next_block_ts = latest_block_ts + .checked_add(SLOT_TIME) + .unwrap_or(latest_block_ts); -fn mev_create_block_test<'a>( - token: CancellationToken, - conf: &'a TestMevArgs, - target: &'a str, -) -> std::pin::Pin + Send + 'a>> { - Box::pin(async move { - let test_res = TestResult::new("CreateBlock"); - - if !conf.load_test { - return TestResult { - verdict: TestVerdict::Skip, - ..test_res - }; + if let Ok(remaining) = next_block_ts.duration_since(std::time::SystemTime::now()) { + tokio::select! { + _ = token.cancelled() => return test_res.fail(CliError::Other("timeout/interrupted".to_string())), + _ = tokio::time::sleep(remaining) => {} } + } - let beacon_endpoint = match &conf.beacon_node_endpoint { - Some(ep) => ep.as_str(), - None => { - return test_res.fail(CliError::Other("beacon-node-endpoint required".to_string())); - } - }; + let latest_slot: i64 = match latest_block.slot.parse() { + Ok(v) => v, + Err(e) => return test_res.fail(CliError::Other(format!("parse slot: {e}"))), + }; - let latest_block = match latest_beacon_block(beacon_endpoint, &token).await { - Ok(b) => b, - Err(e) => return test_res.fail(e), - }; + let mut next_slot = latest_slot.saturating_add(1); + let slots_in_epoch_i64 = i64::try_from(SLOTS_IN_EPOCH).unwrap_or(i64::MAX); + let epoch = next_slot.checked_div(slots_in_epoch_i64).unwrap_or(0); + + let mut proposer_duties = match fetch_proposers_for_epoch(beacon_endpoint, epoch, &token).await + { + Ok(d) => d, + Err(e) => return test_res.fail(e), + }; + + let mut all_blocks_rtt: Vec = Vec::new(); + let x_timeout_ms = conf.x_timeout_ms; + + info!( + mev_relay = target, + blocks = conf.number_of_payloads, + x_timeout_ms = x_timeout_ms, + "Starting attempts for block creation" + ); + + let mut latest_block = latest_block; + + loop { + if token.is_cancelled() { + break; + } + + let start_iteration = Instant::now(); - let latest_block_ts_unix: i64 = match latest_block.body.execution_payload.timestamp.parse() + let rtt = match create_mev_block( + conf, + target, + x_timeout_ms, + next_slot, + &mut latest_block, + &mut proposer_duties, + beacon_endpoint, + &token, + ) + .await { - Ok(v) => v, - Err(e) => return test_res.fail(CliError::Other(format!("parse timestamp: {e}"))), + Ok(r) => r, + Err(e) => return test_res.fail(e), }; - let latest_block_ts = std::time::UNIX_EPOCH - .checked_add(Duration::from_secs(latest_block_ts_unix.unsigned_abs())) - .unwrap_or(std::time::UNIX_EPOCH); - let next_block_ts = latest_block_ts - .checked_add(SLOT_TIME) - .unwrap_or(latest_block_ts); + all_blocks_rtt.push(rtt); + if all_blocks_rtt.len() == usize::try_from(conf.number_of_payloads).unwrap_or(usize::MAX) { + break; + } - if let Ok(remaining) = next_block_ts.duration_since(std::time::SystemTime::now()) { + let elapsed = start_iteration.elapsed(); + let elapsed_nanos = u64::try_from(elapsed.as_nanos()).unwrap_or(u64::MAX); + let slot_nanos = u64::try_from(SLOT_TIME.as_nanos()).unwrap_or(1); + let remainder_nanos = elapsed_nanos.checked_rem(slot_nanos).unwrap_or(0); + let slot_remainder = SLOT_TIME + .checked_sub(Duration::from_nanos(remainder_nanos)) + .unwrap_or_default(); + if let Some(sleep_dur) = slot_remainder.checked_sub(Duration::from_secs(1)) { tokio::select! { - _ = token.cancelled() => return test_res.fail(CliError::Other("timeout/interrupted".to_string())), - _ = tokio::time::sleep(remaining) => {} + _ = token.cancelled() => break, + _ = tokio::time::sleep(sleep_dur) => {} } } - let latest_slot: i64 = match latest_block.slot.parse() { + let start_beacon_fetch = Instant::now(); + latest_block = match latest_beacon_block(beacon_endpoint, &token).await { + Ok(b) => b, + Err(e) => return test_res.fail(e), + }; + + let latest_slot_parsed: i64 = match latest_block.slot.parse() { Ok(v) => v, Err(e) => return test_res.fail(CliError::Other(format!("parse slot: {e}"))), }; - let mut next_slot = latest_slot.saturating_add(1); - let slots_in_epoch_i64 = i64::try_from(SLOTS_IN_EPOCH).unwrap_or(i64::MAX); - let epoch = next_slot.checked_div(slots_in_epoch_i64).unwrap_or(0); - - let mut proposer_duties = - match fetch_proposers_for_epoch(beacon_endpoint, epoch, &token).await { - Ok(d) => d, - Err(e) => return test_res.fail(e), - }; - - let mut all_blocks_rtt: Vec = Vec::new(); - let x_timeout_ms = conf.x_timeout_ms; + next_slot = latest_slot_parsed.saturating_add(1); - info!( - mev_relay = target, - blocks = conf.number_of_payloads, - x_timeout_ms = x_timeout_ms, - "Starting attempts for block creation" - ); - - let mut latest_block = latest_block; - - loop { - if token.is_cancelled() { - break; - } - - let start_iteration = Instant::now(); - - let rtt = match create_mev_block( - conf, - target, - x_timeout_ms, - next_slot, - &mut latest_block, - &mut proposer_duties, - beacon_endpoint, - &token, - ) - .await - { - Ok(r) => r, - Err(e) => return test_res.fail(e), - }; - - all_blocks_rtt.push(rtt); - if all_blocks_rtt.len() - == usize::try_from(conf.number_of_payloads).unwrap_or(usize::MAX) - { - break; - } - - let elapsed = start_iteration.elapsed(); - let elapsed_nanos = u64::try_from(elapsed.as_nanos()).unwrap_or(u64::MAX); - let slot_nanos = u64::try_from(SLOT_TIME.as_nanos()).unwrap_or(1); - let remainder_nanos = elapsed_nanos.checked_rem(slot_nanos).unwrap_or(0); - let slot_remainder = SLOT_TIME - .checked_sub(Duration::from_nanos(remainder_nanos)) - .unwrap_or_default(); - if let Some(sleep_dur) = slot_remainder.checked_sub(Duration::from_secs(1)) { - tokio::select! { - _ = token.cancelled() => break, - _ = tokio::time::sleep(sleep_dur) => {} - } - } - - let start_beacon_fetch = Instant::now(); - latest_block = match latest_beacon_block(beacon_endpoint, &token).await { - Ok(b) => b, - Err(e) => return test_res.fail(e), - }; - - let latest_slot_parsed: i64 = match latest_block.slot.parse() { - Ok(v) => v, - Err(e) => return test_res.fail(CliError::Other(format!("parse slot: {e}"))), - }; - - next_slot = latest_slot_parsed.saturating_add(1); - - // Wait 1 second minus how long the fetch took. - if let Some(sleep_dur) = - Duration::from_secs(1).checked_sub(start_beacon_fetch.elapsed()) - { - tokio::select! { - _ = token.cancelled() => break, - _ = tokio::time::sleep(sleep_dur) => {} - } + // Wait 1 second minus how long the fetch took. + if let Some(sleep_dur) = Duration::from_secs(1).checked_sub(start_beacon_fetch.elapsed()) { + tokio::select! { + _ = token.cancelled() => break, + _ = tokio::time::sleep(sleep_dur) => {} } } + } - if all_blocks_rtt.is_empty() { - return test_res.fail(CliError::Other("timeout/interrupted".to_string())); - } + if all_blocks_rtt.is_empty() { + return test_res.fail(CliError::Other("timeout/interrupted".to_string())); + } - let total_rtt: Duration = all_blocks_rtt.iter().sum(); - let count = u32::try_from(all_blocks_rtt.len().max(1)).unwrap_or(u32::MAX); - let average_rtt = total_rtt.checked_div(count).unwrap_or_default(); + let total_rtt: Duration = all_blocks_rtt.iter().sum(); + let count = u32::try_from(all_blocks_rtt.len().max(1)).unwrap_or(u32::MAX); + let average_rtt = total_rtt.checked_div(count).unwrap_or_default(); - let avg_threshold = Duration::from_millis( - u64::from(x_timeout_ms) - .saturating_mul(9) - .checked_div(10) - .unwrap_or(0), - ); - let poor_threshold = Duration::from_millis(u64::from(x_timeout_ms)); + let avg_threshold = Duration::from_millis( + u64::from(x_timeout_ms) + .saturating_mul(9) + .checked_div(10) + .unwrap_or(0), + ); + let poor_threshold = Duration::from_millis(u64::from(x_timeout_ms)); - evaluate_rtt(average_rtt, test_res, avg_threshold, poor_threshold) - }) + evaluate_rtt(average_rtt, test_res, avg_threshold, poor_threshold) } // Helper types diff --git a/crates/cli/src/commands/test/mod.rs b/crates/cli/src/commands/test/mod.rs index a6df9b2c..01a555f7 100644 --- a/crates/cli/src/commands/test/mod.rs +++ b/crates/cli/src/commands/test/mod.rs @@ -642,23 +642,6 @@ pub(crate) fn calculate_score(results: &[TestResult]) -> CategoryScore { } } -/// Filters tests based on configuration. -pub(crate) fn filter_tests( - supported_test_cases: &HashMap, - test_cases: Option<&[String]>, -) -> Vec { - let mut filtered: Vec = supported_test_cases.keys().cloned().collect(); - if let Some(cases) = test_cases { - filtered.retain(|supported_case| cases.contains(&supported_case.name)); - } - filtered -} - -/// Sorts tests by their order field. -pub(crate) fn sort_tests(tests: &mut [TestCaseName]) { - tests.sort_by_key(|t| t.order); -} - async fn load_or_generate_key(path: &Path) -> CliResult { if tokio::fs::try_exists(path).await? { Ok(load(path)?) From c65b0a2fa7603f0432d66c024a31a01a7d60f0ca Mon Sep 17 00:00:00 2001 From: Lautaro Emanuel Date: Fri, 3 Apr 2026 17:18:13 -0300 Subject: [PATCH 04/10] Use `JoinSet` --- crates/cli/src/commands/test/mev.rs | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/crates/cli/src/commands/test/mev.rs b/crates/cli/src/commands/test/mev.rs index f085a76e..10f46dfa 100644 --- a/crates/cli/src/commands/test/mev.rs +++ b/crates/cli/src/commands/test/mev.rs @@ -7,7 +7,7 @@ use std::{ }; use reqwest::{Method, StatusCode}; -use tokio::{sync::mpsc, task::JoinSet}; +use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; use tracing::info; @@ -191,30 +191,23 @@ async fn test_all_mevs( conf: &TestMevArgs, token: CancellationToken, ) -> HashMap> { - let (tx, mut rx) = mpsc::channel::<(String, Vec)>(conf.endpoints.len()); + let mut join_set = JoinSet::new(); for endpoint in &conf.endpoints { let queued_tests = queued_tests.to_vec(); let conf = conf.clone(); let endpoint = endpoint.clone(); let token = token.clone(); - let tx = tx.clone(); - tokio::spawn(async move { + join_set.spawn(async move { let results = test_single_mev(&queued_tests, &conf, &endpoint, token).await; let relay_name = format_mev_relay_name(&endpoint); - let _ = tx.send((relay_name, results)).await; + (relay_name, results) }); } - drop(tx); - - let mut all_results = HashMap::new(); - while let Some((name, results)) = rx.recv().await { - all_results.insert(name, results); - } - - all_results + let all_results = join_set.join_all().await; + all_results.into_iter().collect::>() } async fn test_single_mev( From 5b4d55e2124f3f042a92c6dea40e8df5b429a737 Mon Sep 17 00:00:00 2001 From: Lautaro Emanuel Date: Fri, 3 Apr 2026 17:20:33 -0300 Subject: [PATCH 05/10] Consistent argument ordering --- crates/cli/src/commands/test/mev.rs | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/crates/cli/src/commands/test/mev.rs b/crates/cli/src/commands/test/mev.rs index 10f46dfa..ca27fda8 100644 --- a/crates/cli/src/commands/test/mev.rs +++ b/crates/cli/src/commands/test/mev.rs @@ -104,9 +104,9 @@ impl TestCaseMev { async fn run(&self, token: &CancellationToken, conf: &TestMevArgs, target: &str) -> TestResult { match self { - TestCaseMev::Ping => mev_ping_test(token, conf, target).await, - TestCaseMev::PingMeasure => mev_ping_measure_test(token, conf, target).await, - TestCaseMev::CreateBlock => mev_create_block_test(token, conf, target).await, + TestCaseMev::Ping => mev_ping_test(target, conf, token).await, + TestCaseMev::PingMeasure => mev_ping_measure_test(target, conf, token).await, + TestCaseMev::CreateBlock => mev_create_block_test(target, conf, token).await, } } } @@ -216,22 +216,21 @@ async fn test_single_mev( target: &str, token: CancellationToken, ) -> Vec { - let runner_queued_tests = queued_tests.to_vec(); - let mut join_set = JoinSet::new(); - for test_case in runner_queued_tests { - let runner_token = token.clone(); + + for test_case in queued_tests.to_owned() { + let token = token.clone(); let conf = conf.clone(); let target = target.to_string(); join_set.spawn(async move { let tc_name = test_case.test_case_name(); tokio::select! { - _ = runner_token.cancelled() => { + _ = token.cancelled() => { let tr = TestResult::new(&tc_name.name); tr.fail(TestResultError::from_string("timeout/interrupted")) } - r = test_case.run(&runner_token, &conf, &target) => { + r = test_case.run(&token, &conf, &target) => { r } } @@ -241,7 +240,7 @@ async fn test_single_mev( join_set.join_all().await } -async fn mev_ping_test(token: &CancellationToken, _conf: &TestMevArgs, target: &str) -> TestResult { +async fn mev_ping_test(target: &str, _conf: &TestMevArgs, token: &CancellationToken) -> TestResult { let test_res = TestResult::new("Ping"); let url = format!("{target}/eth/v1/builder/status"); let client = reqwest::Client::new(); @@ -267,9 +266,9 @@ async fn mev_ping_test(token: &CancellationToken, _conf: &TestMevArgs, target: & } async fn mev_ping_measure_test( - token: &CancellationToken, - _conf: &TestMevArgs, target: &str, + _conf: &TestMevArgs, + token: &CancellationToken, ) -> TestResult { let test_res = TestResult::new("PingMeasure"); let url = format!("{target}/eth/v1/builder/status"); @@ -291,9 +290,9 @@ async fn mev_ping_measure_test( } async fn mev_create_block_test( - token: &CancellationToken, - conf: &TestMevArgs, target: &str, + conf: &TestMevArgs, + token: &CancellationToken, ) -> TestResult { let test_res = TestResult::new("CreateBlock"); From 79dfc877939a80f81373c009c324bc4985d775c7 Mon Sep 17 00:00:00 2001 From: Lautaro Emanuel Date: Fri, 3 Apr 2026 17:29:41 -0300 Subject: [PATCH 06/10] Remove select with CT - Test is already cancelled by caller appropriately --- crates/cli/src/commands/test/mev.rs | 19 +++---------------- 1 file changed, 3 insertions(+), 16 deletions(-) diff --git a/crates/cli/src/commands/test/mev.rs b/crates/cli/src/commands/test/mev.rs index ca27fda8..b023279b 100644 --- a/crates/cli/src/commands/test/mev.rs +++ b/crates/cli/src/commands/test/mev.rs @@ -328,10 +328,7 @@ async fn mev_create_block_test( .unwrap_or(latest_block_ts); if let Ok(remaining) = next_block_ts.duration_since(std::time::SystemTime::now()) { - tokio::select! { - _ = token.cancelled() => return test_res.fail(CliError::Other("timeout/interrupted".to_string())), - _ = tokio::time::sleep(remaining) => {} - } + tokio::time::sleep(remaining).await; } let latest_slot: i64 = match latest_block.slot.parse() { @@ -362,10 +359,6 @@ async fn mev_create_block_test( let mut latest_block = latest_block; loop { - if token.is_cancelled() { - break; - } - let start_iteration = Instant::now(); let rtt = match create_mev_block( @@ -397,10 +390,7 @@ async fn mev_create_block_test( .checked_sub(Duration::from_nanos(remainder_nanos)) .unwrap_or_default(); if let Some(sleep_dur) = slot_remainder.checked_sub(Duration::from_secs(1)) { - tokio::select! { - _ = token.cancelled() => break, - _ = tokio::time::sleep(sleep_dur) => {} - } + tokio::time::sleep(sleep_dur).await; } let start_beacon_fetch = Instant::now(); @@ -418,10 +408,7 @@ async fn mev_create_block_test( // Wait 1 second minus how long the fetch took. if let Some(sleep_dur) = Duration::from_secs(1).checked_sub(start_beacon_fetch.elapsed()) { - tokio::select! { - _ = token.cancelled() => break, - _ = tokio::time::sleep(sleep_dur) => {} - } + tokio::time::sleep(sleep_dur).await; } } From dec0653b94754c724935cb04555602064d79cfe7 Mon Sep 17 00:00:00 2001 From: Lautaro Emanuel Date: Fri, 3 Apr 2026 17:44:24 -0300 Subject: [PATCH 07/10] Reorder parameters --- crates/cli/src/commands/test/mev.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/cli/src/commands/test/mev.rs b/crates/cli/src/commands/test/mev.rs index b023279b..4a8bf428 100644 --- a/crates/cli/src/commands/test/mev.rs +++ b/crates/cli/src/commands/test/mev.rs @@ -102,7 +102,7 @@ impl TestCaseMev { } } - async fn run(&self, token: &CancellationToken, conf: &TestMevArgs, target: &str) -> TestResult { + async fn run(&self, target: &str, conf: &TestMevArgs, token: &CancellationToken) -> TestResult { match self { TestCaseMev::Ping => mev_ping_test(target, conf, token).await, TestCaseMev::PingMeasure => mev_ping_measure_test(target, conf, token).await, @@ -230,7 +230,7 @@ async fn test_single_mev( let tr = TestResult::new(&tc_name.name); tr.fail(TestResultError::from_string("timeout/interrupted")) } - r = test_case.run(&token, &conf, &target) => { + r = test_case.run(&target, &conf, &token) => { r } } From 97061613342fb8fc9eae5c046c0415d9b84ba692 Mon Sep 17 00:00:00 2001 From: Lautaro Emanuel Date: Fri, 3 Apr 2026 17:51:45 -0300 Subject: [PATCH 08/10] Remove redundant `CancellationToken`s --- crates/cli/src/commands/test/mev.rs | 148 ++++++++++------------------ 1 file changed, 50 insertions(+), 98 deletions(-) diff --git a/crates/cli/src/commands/test/mev.rs b/crates/cli/src/commands/test/mev.rs index 4a8bf428..079c02a7 100644 --- a/crates/cli/src/commands/test/mev.rs +++ b/crates/cli/src/commands/test/mev.rs @@ -102,11 +102,11 @@ impl TestCaseMev { } } - async fn run(&self, target: &str, conf: &TestMevArgs, token: &CancellationToken) -> TestResult { + async fn run(&self, target: &str, conf: &TestMevArgs) -> TestResult { match self { - TestCaseMev::Ping => mev_ping_test(target, conf, token).await, - TestCaseMev::PingMeasure => mev_ping_measure_test(target, conf, token).await, - TestCaseMev::CreateBlock => mev_create_block_test(target, conf, token).await, + TestCaseMev::Ping => mev_ping_test(target, conf).await, + TestCaseMev::PingMeasure => mev_ping_measure_test(target, conf).await, + TestCaseMev::CreateBlock => mev_create_block_test(target, conf).await, } } } @@ -230,7 +230,7 @@ async fn test_single_mev( let tr = TestResult::new(&tc_name.name); tr.fail(TestResultError::from_string("timeout/interrupted")) } - r = test_case.run(&target, &conf, &token) => { + r = test_case.run(&target, &conf) => { r } } @@ -240,7 +240,7 @@ async fn test_single_mev( join_set.join_all().await } -async fn mev_ping_test(target: &str, _conf: &TestMevArgs, token: &CancellationToken) -> TestResult { +async fn mev_ping_test(target: &str, _conf: &TestMevArgs) -> TestResult { let test_res = TestResult::new("Ping"); let url = format!("{target}/eth/v1/builder/status"); let client = reqwest::Client::new(); @@ -250,12 +250,9 @@ async fn mev_ping_test(target: &str, _conf: &TestMevArgs, token: &CancellationTo Err(e) => return test_res.fail(e), }; - let resp = tokio::select! { - _ = token.cancelled() => return test_res.fail(CliError::Other("timeout/interrupted".to_string())), - r = apply_basic_auth(client.get(&clean_url), creds).send() => match r { - Ok(r) => r, - Err(e) => return test_res.fail(e), - } + let resp = match apply_basic_auth(client.get(&clean_url), creds).send().await { + Ok(r) => r, + Err(e) => return test_res.fail(e), }; if resp.status().as_u16() > 399 { @@ -265,20 +262,13 @@ async fn mev_ping_test(target: &str, _conf: &TestMevArgs, token: &CancellationTo test_res.ok() } -async fn mev_ping_measure_test( - target: &str, - _conf: &TestMevArgs, - token: &CancellationToken, -) -> TestResult { +async fn mev_ping_measure_test(target: &str, _conf: &TestMevArgs) -> TestResult { let test_res = TestResult::new("PingMeasure"); let url = format!("{target}/eth/v1/builder/status"); - let rtt = tokio::select! { - _ = token.cancelled() => return test_res.fail(CliError::Other("timeout/interrupted".to_string())), - r = request_rtt(&url, Method::GET, None, StatusCode::OK) => match r { - Ok(r) => r, - Err(e) => return test_res.fail(e), - } + let rtt = match request_rtt(&url, Method::GET, None, StatusCode::OK).await { + Ok(r) => r, + Err(e) => return test_res.fail(e), }; evaluate_rtt( @@ -289,11 +279,7 @@ async fn mev_ping_measure_test( ) } -async fn mev_create_block_test( - target: &str, - conf: &TestMevArgs, - token: &CancellationToken, -) -> TestResult { +async fn mev_create_block_test(target: &str, conf: &TestMevArgs) -> TestResult { let test_res = TestResult::new("CreateBlock"); if !conf.load_test { @@ -310,7 +296,7 @@ async fn mev_create_block_test( } }; - let latest_block = match latest_beacon_block(beacon_endpoint, &token).await { + let latest_block = match latest_beacon_block(beacon_endpoint).await { Ok(b) => b, Err(e) => return test_res.fail(e), }; @@ -340,8 +326,7 @@ async fn mev_create_block_test( let slots_in_epoch_i64 = i64::try_from(SLOTS_IN_EPOCH).unwrap_or(i64::MAX); let epoch = next_slot.checked_div(slots_in_epoch_i64).unwrap_or(0); - let mut proposer_duties = match fetch_proposers_for_epoch(beacon_endpoint, epoch, &token).await - { + let mut proposer_duties = match fetch_proposers_for_epoch(beacon_endpoint, epoch).await { Ok(d) => d, Err(e) => return test_res.fail(e), }; @@ -369,7 +354,6 @@ async fn mev_create_block_test( &mut latest_block, &mut proposer_duties, beacon_endpoint, - &token, ) .await { @@ -394,7 +378,7 @@ async fn mev_create_block_test( } let start_beacon_fetch = Instant::now(); - latest_block = match latest_beacon_block(beacon_endpoint, &token).await { + latest_block = match latest_beacon_block(beacon_endpoint).await { Ok(b) => b, Err(e) => return test_res.fail(e), }; @@ -476,20 +460,15 @@ struct BuilderBidResponse { data: serde_json::Value, } -async fn latest_beacon_block( - endpoint: &str, - token: &CancellationToken, -) -> Result { +async fn latest_beacon_block(endpoint: &str) -> Result { let url = format!("{endpoint}/eth/v2/beacon/blocks/head"); let (clean_url, creds) = parse_endpoint_credentials(&url)?; let client = reqwest::Client::new(); - let resp = tokio::select! { - _ = token.cancelled() => return Err(CliError::Other("timeout/interrupted".to_string())), - r = apply_basic_auth(client.get(&clean_url), creds).send() => { - r.map_err(|e| CliError::Other(format!("http request do: {e}")))? - } - }; + let resp = apply_basic_auth(client.get(&clean_url), creds) + .send() + .await + .map_err(|e| CliError::Other(format!("http request do: {e}")))?; let body = resp .bytes() @@ -505,18 +484,15 @@ async fn latest_beacon_block( async fn fetch_proposers_for_epoch( beacon_endpoint: &str, epoch: i64, - token: &CancellationToken, ) -> Result> { let url = format!("{beacon_endpoint}/eth/v1/validator/duties/proposer/{epoch}"); let (clean_url, creds) = parse_endpoint_credentials(&url)?; let client = reqwest::Client::new(); - let resp = tokio::select! { - _ = token.cancelled() => return Err(CliError::Other("timeout/interrupted".to_string())), - r = apply_basic_auth(client.get(&clean_url), creds).send() => { - r.map_err(|e| CliError::Other(format!("http request do: {e}")))? - } - }; + let resp = apply_basic_auth(client.get(&clean_url), creds) + .send() + .await + .map_err(|e| CliError::Other(format!("http request do: {e}")))?; let body = resp .bytes() @@ -543,7 +519,6 @@ async fn get_block_header( next_slot: i64, block_hash: &str, validator_pub_key: &str, - token: &CancellationToken, ) -> std::result::Result<(BuilderBidResponse, Duration), MevError> { let url = format!("{target}/eth/v1/builder/header/{next_slot}/{block_hash}/{validator_pub_key}"); @@ -554,24 +529,19 @@ async fn get_block_header( let client = reqwest::Client::new(); let start = Instant::now(); - let resp = tokio::select! { - _ = token.cancelled() => { - return Err(MevError::Cli(CliError::Other("timeout/interrupted".to_string()))); - } - r = apply_basic_auth(client.get(&clean_url), creds) - .header("X-Timeout-Ms", x_timeout_ms.to_string()) - .header( - "Date-Milliseconds", - std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap_or_default() - .as_millis() - .to_string(), - ) - .send() => { - r.map_err(|e| MevError::Cli(CliError::Other(format!("http request rtt: {e}"))))? - } - }; + let resp = apply_basic_auth(client.get(&clean_url), creds) + .header("X-Timeout-Ms", x_timeout_ms.to_string()) + .header( + "Date-Milliseconds", + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() + .to_string(), + ) + .send() + .await + .map_err(|e| MevError::Cli(CliError::Other(format!("http request rtt: {e}"))))?; let rtt = start.elapsed(); @@ -599,16 +569,11 @@ async fn create_mev_block( latest_block: &mut BeaconBlockMessage, proposer_duties: &mut Vec, beacon_endpoint: &str, - token: &CancellationToken, ) -> Result { let rtt_get_header; let builder_bid; loop { - if token.is_cancelled() { - return Err(CliError::Other("timeout/interrupted".to_string())); - } - let start_iteration = Instant::now(); let slots_in_epoch_i64 = i64::try_from(SLOTS_IN_EPOCH).unwrap_or(i64::MAX); let epoch = next_slot.checked_div(slots_in_epoch_i64).unwrap_or(0); @@ -616,7 +581,7 @@ async fn create_mev_block( let pk = if let Some(pk) = get_validator_pk_for_slot(proposer_duties, next_slot) { pk } else { - *proposer_duties = fetch_proposers_for_epoch(beacon_endpoint, epoch, token).await?; + *proposer_duties = fetch_proposers_for_epoch(beacon_endpoint, epoch).await?; get_validator_pk_for_slot(proposer_duties, next_slot) .ok_or_else(|| CliError::Other("slot not found".to_string()))? }; @@ -627,7 +592,6 @@ async fn create_mev_block( next_slot, &latest_block.body.execution_payload.block_hash, &pk, - token, ) .await { @@ -648,27 +612,17 @@ async fn create_mev_block( if let Some(sleep_dur) = SLOT_TIME.checked_sub(elapsed) && let Some(sleep_dur) = sleep_dur.checked_sub(Duration::from_secs(1)) { - tokio::select! { - _ = token.cancelled() => { - return Err(CliError::Other("timeout/interrupted".to_string())); - } - _ = tokio::time::sleep(sleep_dur) => {} - } + tokio::time::sleep(sleep_dur).await; } let start_beacon_fetch = Instant::now(); - *latest_block = latest_beacon_block(beacon_endpoint, token).await?; + *latest_block = latest_beacon_block(beacon_endpoint).await?; next_slot = next_slot.saturating_add(1); if let Some(sleep_dur) = Duration::from_secs(1).checked_sub(start_beacon_fetch.elapsed()) { - tokio::select! { - _ = token.cancelled() => { - return Err(CliError::Other("timeout/interrupted".to_string())); - } - _ = tokio::time::sleep(sleep_dur) => {} - } + tokio::time::sleep(sleep_dur).await; } continue; @@ -684,15 +638,13 @@ async fn create_mev_block( )) })?; - let rtt_submit_block = tokio::select! { - _ = token.cancelled() => return Err(CliError::Other("timeout/interrupted".to_string())), - r = request_rtt( - format!("{target}/eth/v1/builder/blinded_blocks"), - Method::POST, - Some(payload_json), - StatusCode::BAD_REQUEST, - ) => r? - }; + let rtt_submit_block = request_rtt( + format!("{target}/eth/v1/builder/blinded_blocks"), + Method::POST, + Some(payload_json), + StatusCode::BAD_REQUEST, + ) + .await?; Ok(rtt_get_header .checked_add(rtt_submit_block) From 9c690e39acdf7dad391bbf5bf0edc9b4c27fc101 Mon Sep 17 00:00:00 2001 From: Lautaro Emanuel Date: Fri, 3 Apr 2026 17:57:56 -0300 Subject: [PATCH 09/10] Pass main CT --- crates/cli/src/commands/test/mev.rs | 18 ++++++++++++------ crates/cli/src/main.rs | 2 +- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/crates/cli/src/commands/test/mev.rs b/crates/cli/src/commands/test/mev.rs index 079c02a7..eba8b857 100644 --- a/crates/cli/src/commands/test/mev.rs +++ b/crates/cli/src/commands/test/mev.rs @@ -112,7 +112,11 @@ impl TestCaseMev { } /// Runs the MEV relay tests. -pub async fn run(args: TestMevArgs, writer: &mut dyn Write) -> Result { +pub async fn run( + args: TestMevArgs, + writer: &mut dyn Write, + token: &CancellationToken, +) -> Result { must_output_to_file_on_quiet(args.test_config.quiet, &args.test_config.output_json)?; // Validate flag combinations. @@ -140,11 +144,13 @@ pub async fn run(args: TestMevArgs, writer: &mut dyn Write) -> Result ExitResult { .await .map(|_| ()) } - TestCommands::Mev(args) => commands::test::mev::run(args, &mut stdout) + TestCommands::Mev(args) => commands::test::mev::run(args, &mut stdout, &ct) .await .map(|_| ()), TestCommands::Infra(args) => commands::test::infra::run(args, &mut stdout) From 0c11a442189190abd5862f9c52d94359e45e411c Mon Sep 17 00:00:00 2001 From: Lautaro Emanuel Date: Fri, 3 Apr 2026 18:08:02 -0300 Subject: [PATCH 10/10] Fix clippy lints - Workaround for false positive --- crates/cli/src/commands/test/mev.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/cli/src/commands/test/mev.rs b/crates/cli/src/commands/test/mev.rs index eba8b857..52607ee9 100644 --- a/crates/cli/src/commands/test/mev.rs +++ b/crates/cli/src/commands/test/mev.rs @@ -224,7 +224,8 @@ async fn test_single_mev( ) -> Vec { let mut join_set = JoinSet::new(); - for test_case in queued_tests.to_owned() { + let queued_tests = queued_tests.to_vec(); + for test_case in queued_tests { let token = token.clone(); let conf = conf.clone(); let target = target.to_string();