-
Notifications
You must be signed in to change notification settings - Fork 1
feat(cli): added cli validator tests #286
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 9 commits
Commits
Show all changes
15 commits
Select commit
Hold shift + click to select a range
b45292d
feat: added cli validator tests
PoulavBhowmick03 ccb3c68
Merge branch 'main' into feat/cmd_validator_tests
PoulavBhowmick03 2bc8118
Merge remote-tracking branch 'upstream/main' into feat/cmd_validator_…
PoulavBhowmick03 2ee355b
suggestions
PoulavBhowmick03 073a39d
clippy
PoulavBhowmick03 edffe34
Merge remote-tracking branch 'origin/feat/cmd_validator_tests' into f…
PoulavBhowmick03 8c0f4cf
Merge remote-tracking branch 'upstream/main' into feat/cmd_validator_…
PoulavBhowmick03 3553cc5
simplify timeout iteration
PoulavBhowmick03 3066fd5
Merge branch 'main' of https://github.com/NethermindEth/pluto into po…
PoulavBhowmick03 3d9cb4a
fix(cli): address validator test PR review comments
varex83agent 88fb5cd
fix(deny): add advisory ignores for transitive deps
varex83agent 977bc0d
Merge branch 'main' into poulav/cmd_validator_tests
varex83agent 10b4dfa
fix(cli): address emlautarom1 review comments on validator tests
varex83agent 47abf92
fix linter
varex83 8ec9c69
fix: stop ping_continuously loop on dial error to match charon behavior
varex83agent File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,10 +1,56 @@ | ||
| //! Validator client connectivity tests. | ||
|
|
||
| use super::{TestCategoryResult, TestConfigArgs}; | ||
| use crate::error::Result; | ||
| use clap::Args; | ||
| use std::{io::Write, time::Duration}; | ||
|
|
||
| use clap::Args; | ||
| use rand::Rng; | ||
| use tokio::{ | ||
| net::TcpStream, | ||
| sync::mpsc, | ||
| time::{Instant, timeout}, | ||
| }; | ||
|
|
||
| use super::{ | ||
| AllCategoriesResult, TestCategory, TestCategoryResult, TestConfigArgs, TestResult, TestVerdict, | ||
| calculate_score, evaluate_highest_rtt, evaluate_rtt, publish_result_to_obol_api, | ||
| write_result_to_file, write_result_to_writer, | ||
| }; | ||
| use crate::{duration::Duration as CliDuration, error::Result}; | ||
|
|
||
| // Thresholds (from Go implementation) | ||
| const THRESHOLD_MEASURE_AVG: Duration = Duration::from_millis(50); | ||
| const THRESHOLD_MEASURE_POOR: Duration = Duration::from_millis(240); | ||
| const THRESHOLD_LOAD_AVG: Duration = Duration::from_millis(50); | ||
| const THRESHOLD_LOAD_POOR: Duration = Duration::from_millis(240); | ||
|
|
||
| /// Validator test cases. | ||
| #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] | ||
| pub enum ValidatorTestCase { | ||
| Ping, | ||
| PingMeasure, | ||
| PingLoad, | ||
| } | ||
|
|
||
| impl ValidatorTestCase { | ||
| /// Returns all validator test cases. | ||
| pub fn all() -> &'static [ValidatorTestCase] { | ||
| &[ | ||
| ValidatorTestCase::Ping, | ||
| ValidatorTestCase::PingMeasure, | ||
| ValidatorTestCase::PingLoad, | ||
| ] | ||
| } | ||
|
|
||
| /// Returns the test name as a string. | ||
| pub fn name(&self) -> &'static str { | ||
| match self { | ||
| ValidatorTestCase::Ping => "Ping", | ||
| ValidatorTestCase::PingMeasure => "PingMeasure", | ||
| ValidatorTestCase::PingLoad => "PingLoad", | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /// Arguments for the validator test command. | ||
| #[derive(Args, Clone, Debug)] | ||
| pub struct TestValidatorArgs { | ||
|
|
@@ -30,10 +76,233 @@ pub struct TestValidatorArgs { | |
| } | ||
|
|
||
| /// Runs the validator client tests. | ||
| pub async fn run(_args: TestValidatorArgs, _writer: &mut dyn Write) -> Result<TestCategoryResult> { | ||
| // TODO: Implement validator tests | ||
| // - Ping | ||
| // - PingMeasure | ||
| // - PingLoad | ||
| unimplemented!("validator test not yet implemented") | ||
| pub async fn run(args: TestValidatorArgs, writer: &mut dyn Write) -> Result<TestCategoryResult> { | ||
| tracing::info!("Starting validator client test"); | ||
|
varex83agent marked this conversation as resolved.
Outdated
|
||
|
|
||
| let start_time = Instant::now(); | ||
|
varex83agent marked this conversation as resolved.
Outdated
|
||
|
|
||
| // Get and filter test cases | ||
| let queued_tests: Vec<ValidatorTestCase> = if let Some(ref filter) = args.test_config.test_cases | ||
| { | ||
| ValidatorTestCase::all() | ||
| .iter() | ||
| .filter(|tc| filter.contains(&tc.name().to_string())) | ||
| .copied() | ||
| .collect() | ||
| } else { | ||
| ValidatorTestCase::all().to_vec() | ||
| }; | ||
|
|
||
| if queued_tests.is_empty() { | ||
| return Err(crate::error::CliError::Other( | ||
| "test case not supported".into(), | ||
| )); | ||
| } | ||
|
|
||
| // Run tests with timeout | ||
| let test_results = run_tests_with_timeout(&args, &queued_tests).await; | ||
|
|
||
| let score = calculate_score(&test_results); | ||
|
|
||
| let mut res = TestCategoryResult::new(TestCategory::Validator); | ||
| res.targets.insert(args.api_address.clone(), test_results); | ||
| res.execution_time = Some(CliDuration::new(start_time.elapsed())); | ||
| res.score = Some(score); | ||
|
|
||
| if !args.test_config.quiet { | ||
| write_result_to_writer(&res, writer)?; | ||
| } | ||
|
|
||
| if !args.test_config.output_json.is_empty() { | ||
| write_result_to_file(&res, args.test_config.output_json.as_ref()).await?; | ||
| } | ||
|
|
||
| if args.test_config.publish { | ||
| let all = AllCategoriesResult { | ||
| validator: Some(res.clone()), | ||
| ..Default::default() | ||
| }; | ||
| publish_result_to_obol_api( | ||
| all, | ||
| &args.test_config.publish_addr, | ||
| &args.test_config.publish_private_key_file, | ||
| ) | ||
| .await?; | ||
| } | ||
|
|
||
| Ok(res) | ||
| } | ||
|
|
||
| /// Timeout error message | ||
| const ERR_TIMEOUT_INTERRUPTED: &str = "timeout"; | ||
|
varex83agent marked this conversation as resolved.
Outdated
|
||
|
|
||
| /// Runs tests with timeout, keeping completed tests on timeout. | ||
| async fn run_tests_with_timeout( | ||
| args: &TestValidatorArgs, | ||
| tests: &[ValidatorTestCase], | ||
| ) -> Vec<TestResult> { | ||
| let mut results = Vec::new(); | ||
| let timeout_deadline = tokio::time::Instant::now() | ||
|
varex83agent marked this conversation as resolved.
Outdated
|
||
| .checked_add(args.test_config.timeout) | ||
|
varex83agent marked this conversation as resolved.
Outdated
|
||
| .expect("timeout overflow"); | ||
|
|
||
| for &test_case in tests { | ||
| let remaining = timeout_deadline.saturating_duration_since(tokio::time::Instant::now()); | ||
|
|
||
|
emlautarom1 marked this conversation as resolved.
|
||
| match tokio::time::timeout(remaining, run_single_test(args, test_case)).await { | ||
|
varex83agent marked this conversation as resolved.
Outdated
|
||
| Ok(result) => results.push(result), | ||
| Err(_) => { | ||
| results.push( | ||
| TestResult::new(test_case.name()) | ||
| .fail(std::io::Error::other(ERR_TIMEOUT_INTERRUPTED)), | ||
| ); | ||
| break; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| results | ||
| } | ||
|
PoulavBhowmick03 marked this conversation as resolved.
|
||
|
|
||
| /// Runs a single test case. | ||
| async fn run_single_test(args: &TestValidatorArgs, test_case: ValidatorTestCase) -> TestResult { | ||
| match test_case { | ||
| ValidatorTestCase::Ping => ping_test(args).await, | ||
| ValidatorTestCase::PingMeasure => ping_measure_test(args).await, | ||
| ValidatorTestCase::PingLoad => ping_load_test(args).await, | ||
| } | ||
| } | ||
|
|
||
| async fn ping_test(args: &TestValidatorArgs) -> TestResult { | ||
| let mut result = TestResult::new(ValidatorTestCase::Ping.name()); | ||
|
|
||
| match timeout( | ||
|
varex83agent marked this conversation as resolved.
|
||
| Duration::from_secs(1), | ||
| TcpStream::connect(&args.api_address), | ||
| ) | ||
| .await | ||
| { | ||
| Ok(Ok(_conn)) => { | ||
| result.verdict = TestVerdict::Ok; | ||
| } | ||
| Ok(Err(e)) => { | ||
| return result.fail(e); | ||
| } | ||
| Err(_) => { | ||
| return result.fail(std::io::Error::new( | ||
| std::io::ErrorKind::TimedOut, | ||
| "connection timeout", | ||
| )); | ||
| } | ||
| } | ||
|
|
||
| result | ||
| } | ||
|
|
||
| async fn ping_measure_test(args: &TestValidatorArgs) -> TestResult { | ||
| let mut result = TestResult::new(ValidatorTestCase::PingMeasure.name()); | ||
| let before = Instant::now(); | ||
|
|
||
| match timeout( | ||
| Duration::from_secs(1), | ||
| TcpStream::connect(&args.api_address), | ||
| ) | ||
| .await | ||
| { | ||
| Ok(Ok(_conn)) => { | ||
| let rtt = before.elapsed(); | ||
| result = evaluate_rtt(rtt, result, THRESHOLD_MEASURE_AVG, THRESHOLD_MEASURE_POOR); | ||
| } | ||
| Ok(Err(e)) => { | ||
| return result.fail(e); | ||
| } | ||
| Err(_) => { | ||
| return result.fail(std::io::Error::new( | ||
| std::io::ErrorKind::TimedOut, | ||
| "connection timeout", | ||
| )); | ||
| } | ||
| } | ||
|
|
||
| result | ||
| } | ||
|
|
||
| async fn ping_load_test(args: &TestValidatorArgs) -> TestResult { | ||
| tracing::info!( | ||
| duration = ?args.load_test_duration, | ||
| target = %args.api_address, | ||
| "Running ping load tests..." | ||
| ); | ||
|
|
||
| let mut result = TestResult::new(ValidatorTestCase::PingLoad.name()); | ||
|
|
||
| let (tx, mut rx) = mpsc::channel::<Duration>(i16::MAX as usize); | ||
| let address = args.api_address.clone(); | ||
| let duration = args.load_test_duration; | ||
|
|
||
| let handle = tokio::spawn(async move { | ||
|
varex83agent marked this conversation as resolved.
Outdated
|
||
| let start = Instant::now(); | ||
| let mut interval = tokio::time::interval(Duration::from_secs(1)); | ||
| let mut workers = tokio::task::JoinSet::new(); | ||
|
|
||
| interval.tick().await; | ||
| while start.elapsed() < duration { | ||
| interval.tick().await; | ||
|
|
||
| let tx = tx.clone(); | ||
| let addr = address.clone(); | ||
| let remaining = duration.saturating_sub(start.elapsed()); | ||
|
|
||
| workers.spawn(async move { | ||
| ping_continuously(addr, tx, remaining).await; | ||
| }); | ||
| } | ||
|
|
||
| // Drop the scheduler's clone so only workers hold senders | ||
| drop(tx); | ||
|
|
||
| // Wait for all spawned ping workers to finish | ||
| while workers.join_next().await.is_some() {} | ||
|
varex83agent marked this conversation as resolved.
Outdated
|
||
| }); | ||
|
|
||
| let _ = handle.await; | ||
|
varex83agent marked this conversation as resolved.
Outdated
|
||
|
|
||
| // All senders dropped, collect all RTTs | ||
| rx.close(); | ||
| let mut rtts = Vec::new(); | ||
| while let Some(rtt) = rx.recv().await { | ||
| rtts.push(rtt); | ||
| } | ||
|
|
||
| tracing::info!(target = %args.api_address, "Ping load tests finished"); | ||
|
|
||
| result = evaluate_highest_rtt(rtts, result, THRESHOLD_LOAD_AVG, THRESHOLD_LOAD_POOR); | ||
|
|
||
| result | ||
| } | ||
|
|
||
| async fn ping_continuously(address: String, tx: mpsc::Sender<Duration>, max_duration: Duration) { | ||
|
varex83agent marked this conversation as resolved.
Outdated
|
||
| let start = Instant::now(); | ||
|
|
||
| while start.elapsed() < max_duration { | ||
| let before = Instant::now(); | ||
|
|
||
| match timeout(Duration::from_secs(1), TcpStream::connect(&address)).await { | ||
| Ok(Ok(conn)) => { | ||
| let rtt = before.elapsed(); | ||
| if tx.send(rtt).await.is_err() { | ||
| drop(conn); | ||
|
varex83agent marked this conversation as resolved.
Outdated
|
||
| return; | ||
| } | ||
| } | ||
| Ok(Err(e)) => { | ||
| tracing::warn!(target = %address, error = ?e, "Ping connection attempt failed during load test"); | ||
| } | ||
| Err(e) => { | ||
| tracing::warn!(target = %address, error = ?e, "Ping connection attempt timed out during load test"); | ||
| } | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In charon, when dial has error, it will stop the loop This can affect the score later |
||
| } | ||
| let sleep_ms = rand::thread_rng().gen_range(0..100); | ||
| tokio::time::sleep(Duration::from_millis(sleep_ms)).await; | ||
| } | ||
| } | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.