From 0e8013c42a0ca2bf8441b83c2bdc69dce569b071 Mon Sep 17 00:00:00 2001 From: Dodecahedr0x Date: Sun, 22 Feb 2026 02:22:30 +0100 Subject: [PATCH 1/3] fix: wait for valid blockhash --- magicblock-api/src/magic_validator.rs | 3 +- magicblock-task-scheduler/src/service.rs | 20 ++- .../test-ledger-restore/src/lib.rs | 2 +- .../tests/16_cranks_persists.rs | 116 ++++++++++++++++++ 4 files changed, 137 insertions(+), 4 deletions(-) create mode 100644 test-integration/test-ledger-restore/tests/16_cranks_persists.rs diff --git a/magicblock-api/src/magic_validator.rs b/magicblock-api/src/magic_validator.rs index aca45c5eb..21fdd2da2 100644 --- a/magicblock-api/src/magic_validator.rs +++ b/magicblock-api/src/magic_validator.rs @@ -5,7 +5,7 @@ use std::{ Arc, }, thread, - time::Instant, + time::{Duration, Instant}, }; use magicblock_account_cloner::{ @@ -335,6 +335,7 @@ impl MagicValidator { .take() .expect("tasks_service should be initialized"), ledger.latest_block().clone(), + Duration::from_millis(config.ledger.block_time_ms()), token.clone(), )?; log_timing("startup", "task_scheduler_init", step_start); diff --git a/magicblock-task-scheduler/src/service.rs b/magicblock-task-scheduler/src/service.rs index 024a0063e..84b4032d3 100644 --- a/magicblock-task-scheduler/src/service.rs +++ b/magicblock-task-scheduler/src/service.rs @@ -49,6 +49,8 @@ pub struct TaskSchedulerService { token: CancellationToken, /// Minimum interval between task executions min_interval: Duration, + /// Slot interval of the validator + slot_interval: Duration, } enum ProcessingOutcome { @@ -69,6 +71,7 @@ impl TaskSchedulerService { rpc_url: String, scheduled_tasks: ScheduledTasksRx, block: LatestBlock, + slot_interval: Duration, token: CancellationToken, ) -> TaskSchedulerResult { if config.reset { @@ -96,12 +99,14 @@ impl TaskSchedulerService { tx_counter: AtomicU64::default(), token, min_interval: config.min_interval, + slot_interval, }) } pub async fn start( mut self, ) -> TaskSchedulerResult>> { + // Reschedule all tasks that are due let tasks = self.db.get_tasks().await?; let now = chrono::Utc::now().timestamp_millis(); debug!( @@ -124,8 +129,13 @@ impl TaskSchedulerService { let next_execution = task.last_execution_millis + task.execution_interval_millis; + // Earliest reschedule time is 2 slot interval. + // This avoids, scheduling before the first blockhash is produced on restart. let timeout = Duration::from_millis( - next_execution.saturating_sub(now).max(0) as u64, + next_execution + .saturating_sub(now) + .max(2 * self.slot_interval.as_millis() as i64) + as u64, ); let task_id = task.id; let key = self.task_queue.insert(task, timeout); @@ -364,7 +374,10 @@ fn is_valid_task_interval(interval: i64) -> bool { #[cfg(test)] mod tests { - use magicblock_program::args::ScheduleTaskRequest; + use magicblock_program::{ + args::ScheduleTaskRequest, + validator::generate_validator_authority_if_needed, + }; use solana_pubkey::Pubkey; use tokio::{sync::mpsc, time::timeout}; @@ -373,6 +386,7 @@ mod tests { #[tokio::test] async fn test_schedule_invalid_tasks() { magicblock_core::logger::init_for_tests(); + generate_validator_authority_if_needed(); let (tx, rx) = mpsc::unbounded_channel(); let db = SchedulerDatabase::new(":memory:").unwrap(); @@ -386,6 +400,7 @@ mod tests { tx_counter: AtomicU64::default(), token: CancellationToken::new(), min_interval: Duration::from_millis(1000), + slot_interval: Duration::from_millis(1000), scheduled_tasks: rx, }; @@ -466,6 +481,7 @@ mod tests { tx_counter: AtomicU64::default(), token: CancellationToken::new(), min_interval: Duration::from_millis(1000), + slot_interval: Duration::from_millis(1000), scheduled_tasks: rx, }; diff --git a/test-integration/test-ledger-restore/src/lib.rs b/test-integration/test-ledger-restore/src/lib.rs index 084ab416e..bff15fe01 100644 --- a/test-integration/test-ledger-restore/src/lib.rs +++ b/test-integration/test-ledger-restore/src/lib.rs @@ -150,7 +150,7 @@ pub fn setup_validator_with_local_remote_and_resume_strategy( accountsdb: accountsdb_config.clone(), programs, task_scheduler: TaskSchedulerConfig { - reset: true, + reset: reset_ledger, ..Default::default() }, lifecycle: LifecycleMode::Ephemeral, diff --git a/test-integration/test-ledger-restore/tests/16_cranks_persists.rs b/test-integration/test-ledger-restore/tests/16_cranks_persists.rs new file mode 100644 index 000000000..621a0f42a --- /dev/null +++ b/test-integration/test-ledger-restore/tests/16_cranks_persists.rs @@ -0,0 +1,116 @@ +use std::{path::Path, process::Child}; + +use cleanass::assert; +use integration_test_tools::{ + expect, loaded_accounts::LoadedAccounts, tmpdir::resolve_tmp_dir, + validator::cleanup, +}; +use program_flexi_counter::instruction::create_schedule_task_ix; +use solana_sdk::{signature::Keypair, signer::Signer}; +use test_kit::init_logger; +use test_ledger_restore::{ + confirm_tx_with_payer_ephem, fetch_counter_ephem, + init_and_delegate_counter_and_payer, setup_validator_with_local_remote, + setup_validator_with_local_remote_and_resume_strategy, + wait_for_ledger_persist, TMP_DIR_LEDGER, +}; +use tracing::*; + +#[test] +fn test_crank_persistence() { + init_logger!(); + let (_tmpdir, ledger_path) = resolve_tmp_dir(TMP_DIR_LEDGER); + + let (mut validator, payer, count) = write(&ledger_path); + validator.kill().unwrap(); + + let mut validator = read(&ledger_path, &payer, count); + validator.kill().unwrap(); +} + +pub fn write(ledger_path: &Path) -> (Child, Keypair, u64) { + let (_, mut validator, ctx) = setup_validator_with_local_remote( + ledger_path, + None, + true, + false, + &LoadedAccounts::with_delegation_program_test_authority(), + ); + + let (payer, _) = + init_and_delegate_counter_and_payer(&ctx, &mut validator, "COUNTER"); + + // Wait for account to be delegated + // expect!(ctx.wait_for_delta_slot_ephem(10), validator); + + // Schedule a task + let task_id = 1; + let execution_interval_millis = 50; + let iterations = 1000; + let ix = create_schedule_task_ix( + payer.pubkey(), + task_id, + execution_interval_millis, + iterations, + false, + false, + ); + confirm_tx_with_payer_ephem(ix, &payer, &ctx, &mut validator); + + // Wait for the task to be scheduled and executed + expect!(ctx.wait_for_delta_slot_ephem(1), validator); + + // Check that the counter was incremented + let counter_account = + fetch_counter_ephem(&ctx, &payer.pubkey(), &mut validator); + assert!( + counter_account.count > 0, + cleanup(&mut validator), + "counter.count: {}", + counter_account.count + ); + + // Wait more to be sure the ledger is persisted + wait_for_ledger_persist(&ctx, &mut validator); + debug!("✅ Ledger persisted"); + + (validator, payer, counter_account.count) +} + +pub fn read(ledger_path: &Path, kp: &Keypair, count: u64) -> Child { + let (_, mut validator, ctx) = + setup_validator_with_local_remote_and_resume_strategy( + ledger_path, + None, + false, + false, + &LoadedAccounts::with_delegation_program_test_authority(), + ); + + // Check that the counter persisted its value + let current_count = + fetch_counter_ephem(&ctx, &kp.pubkey(), &mut validator).count; + assert!( + current_count >= count, + cleanup(&mut validator), + "counter.count: {} < {}", + current_count, + count + ); + + // Wait for the crank to execute + expect!(ctx.wait_for_delta_slot_ephem(3), validator); + + // Check that the count increased + let new_count = + fetch_counter_ephem(&ctx, &kp.pubkey(), &mut validator).count; + assert!( + new_count > current_count, + cleanup(&mut validator), + "counter.count: {} <= {}", + new_count, + current_count + ); + + validator +} From e9888803387ee0430bd038eed710767ce8056d66 Mon Sep 17 00:00:00 2001 From: Dodecahedr0x Date: Sun, 22 Feb 2026 09:51:50 +0100 Subject: [PATCH 2/3] fix: increase wait time --- .../test-ledger-restore/tests/16_cranks_persists.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/test-integration/test-ledger-restore/tests/16_cranks_persists.rs b/test-integration/test-ledger-restore/tests/16_cranks_persists.rs index 621a0f42a..ceabf073c 100644 --- a/test-integration/test-ledger-restore/tests/16_cranks_persists.rs +++ b/test-integration/test-ledger-restore/tests/16_cranks_persists.rs @@ -40,9 +40,6 @@ pub fn write(ledger_path: &Path) -> (Child, Keypair, u64) { let (payer, _) = init_and_delegate_counter_and_payer(&ctx, &mut validator, "COUNTER"); - // Wait for account to be delegated - // expect!(ctx.wait_for_delta_slot_ephem(10), validator); - // Schedule a task let task_id = 1; let execution_interval_millis = 50; @@ -99,7 +96,7 @@ pub fn read(ledger_path: &Path, kp: &Keypair, count: u64) -> Child { ); // Wait for the crank to execute - expect!(ctx.wait_for_delta_slot_ephem(3), validator); + expect!(ctx.wait_for_delta_slot_ephem(5), validator); // Check that the count increased let new_count = From b05134655c1b4c29f91a236c7fbfe09a6e300a3e Mon Sep 17 00:00:00 2001 From: Dodecahedr0x Date: Sun, 22 Feb 2026 11:00:47 +0100 Subject: [PATCH 3/3] feat: increase wait time --- .../test-ledger-restore/tests/16_cranks_persists.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/test-integration/test-ledger-restore/tests/16_cranks_persists.rs b/test-integration/test-ledger-restore/tests/16_cranks_persists.rs index ceabf073c..12e9a4700 100644 --- a/test-integration/test-ledger-restore/tests/16_cranks_persists.rs +++ b/test-integration/test-ledger-restore/tests/16_cranks_persists.rs @@ -28,7 +28,7 @@ fn test_crank_persistence() { validator.kill().unwrap(); } -pub fn write(ledger_path: &Path) -> (Child, Keypair, u64) { +fn write(ledger_path: &Path) -> (Child, Keypair, u64) { let (_, mut validator, ctx) = setup_validator_with_local_remote( ledger_path, None, @@ -55,7 +55,7 @@ pub fn write(ledger_path: &Path) -> (Child, Keypair, u64) { confirm_tx_with_payer_ephem(ix, &payer, &ctx, &mut validator); // Wait for the task to be scheduled and executed - expect!(ctx.wait_for_delta_slot_ephem(1), validator); + expect!(ctx.wait_for_delta_slot_ephem(3), validator); // Check that the counter was incremented let counter_account = @@ -74,7 +74,7 @@ pub fn write(ledger_path: &Path) -> (Child, Keypair, u64) { (validator, payer, counter_account.count) } -pub fn read(ledger_path: &Path, kp: &Keypair, count: u64) -> Child { +fn read(ledger_path: &Path, kp: &Keypair, count: u64) -> Child { let (_, mut validator, ctx) = setup_validator_with_local_remote_and_resume_strategy( ledger_path, @@ -96,7 +96,7 @@ pub fn read(ledger_path: &Path, kp: &Keypair, count: u64) -> Child { ); // Wait for the crank to execute - expect!(ctx.wait_for_delta_slot_ephem(5), validator); + expect!(ctx.wait_for_delta_slot_ephem(10), validator); // Check that the count increased let new_count =