diff --git a/magicblock-api/src/magic_validator.rs b/magicblock-api/src/magic_validator.rs index aca45c5eb..21fdd2da2 100644 --- a/magicblock-api/src/magic_validator.rs +++ b/magicblock-api/src/magic_validator.rs @@ -5,7 +5,7 @@ use std::{ Arc, }, thread, - time::Instant, + time::{Duration, Instant}, }; use magicblock_account_cloner::{ @@ -335,6 +335,7 @@ impl MagicValidator { .take() .expect("tasks_service should be initialized"), ledger.latest_block().clone(), + Duration::from_millis(config.ledger.block_time_ms()), token.clone(), )?; log_timing("startup", "task_scheduler_init", step_start); diff --git a/magicblock-task-scheduler/src/service.rs b/magicblock-task-scheduler/src/service.rs index 024a0063e..84b4032d3 100644 --- a/magicblock-task-scheduler/src/service.rs +++ b/magicblock-task-scheduler/src/service.rs @@ -49,6 +49,8 @@ pub struct TaskSchedulerService { token: CancellationToken, /// Minimum interval between task executions min_interval: Duration, + /// Slot interval of the validator + slot_interval: Duration, } enum ProcessingOutcome { @@ -69,6 +71,7 @@ impl TaskSchedulerService { rpc_url: String, scheduled_tasks: ScheduledTasksRx, block: LatestBlock, + slot_interval: Duration, token: CancellationToken, ) -> TaskSchedulerResult { if config.reset { @@ -96,12 +99,14 @@ impl TaskSchedulerService { tx_counter: AtomicU64::default(), token, min_interval: config.min_interval, + slot_interval, }) } pub async fn start( mut self, ) -> TaskSchedulerResult>> { + // Reschedule all tasks that are due let tasks = self.db.get_tasks().await?; let now = chrono::Utc::now().timestamp_millis(); debug!( @@ -124,8 +129,13 @@ impl TaskSchedulerService { let next_execution = task.last_execution_millis + task.execution_interval_millis; + // Earliest reschedule time is 2 slot interval. + // This avoids, scheduling before the first blockhash is produced on restart. let timeout = Duration::from_millis( - next_execution.saturating_sub(now).max(0) as u64, + next_execution + .saturating_sub(now) + .max(2 * self.slot_interval.as_millis() as i64) + as u64, ); let task_id = task.id; let key = self.task_queue.insert(task, timeout); @@ -364,7 +374,10 @@ fn is_valid_task_interval(interval: i64) -> bool { #[cfg(test)] mod tests { - use magicblock_program::args::ScheduleTaskRequest; + use magicblock_program::{ + args::ScheduleTaskRequest, + validator::generate_validator_authority_if_needed, + }; use solana_pubkey::Pubkey; use tokio::{sync::mpsc, time::timeout}; @@ -373,6 +386,7 @@ mod tests { #[tokio::test] async fn test_schedule_invalid_tasks() { magicblock_core::logger::init_for_tests(); + generate_validator_authority_if_needed(); let (tx, rx) = mpsc::unbounded_channel(); let db = SchedulerDatabase::new(":memory:").unwrap(); @@ -386,6 +400,7 @@ mod tests { tx_counter: AtomicU64::default(), token: CancellationToken::new(), min_interval: Duration::from_millis(1000), + slot_interval: Duration::from_millis(1000), scheduled_tasks: rx, }; @@ -466,6 +481,7 @@ mod tests { tx_counter: AtomicU64::default(), token: CancellationToken::new(), min_interval: Duration::from_millis(1000), + slot_interval: Duration::from_millis(1000), scheduled_tasks: rx, }; diff --git a/test-integration/test-ledger-restore/src/lib.rs b/test-integration/test-ledger-restore/src/lib.rs index 084ab416e..bff15fe01 100644 --- a/test-integration/test-ledger-restore/src/lib.rs +++ b/test-integration/test-ledger-restore/src/lib.rs @@ -150,7 +150,7 @@ pub fn setup_validator_with_local_remote_and_resume_strategy( accountsdb: accountsdb_config.clone(), programs, task_scheduler: TaskSchedulerConfig { - reset: true, + reset: reset_ledger, ..Default::default() }, lifecycle: LifecycleMode::Ephemeral, diff --git a/test-integration/test-ledger-restore/tests/16_cranks_persists.rs b/test-integration/test-ledger-restore/tests/16_cranks_persists.rs new file mode 100644 index 000000000..12e9a4700 --- /dev/null +++ b/test-integration/test-ledger-restore/tests/16_cranks_persists.rs @@ -0,0 +1,113 @@ +use std::{path::Path, process::Child}; + +use cleanass::assert; +use integration_test_tools::{ + expect, loaded_accounts::LoadedAccounts, tmpdir::resolve_tmp_dir, + validator::cleanup, +}; +use program_flexi_counter::instruction::create_schedule_task_ix; +use solana_sdk::{signature::Keypair, signer::Signer}; +use test_kit::init_logger; +use test_ledger_restore::{ + confirm_tx_with_payer_ephem, fetch_counter_ephem, + init_and_delegate_counter_and_payer, setup_validator_with_local_remote, + setup_validator_with_local_remote_and_resume_strategy, + wait_for_ledger_persist, TMP_DIR_LEDGER, +}; +use tracing::*; + +#[test] +fn test_crank_persistence() { + init_logger!(); + let (_tmpdir, ledger_path) = resolve_tmp_dir(TMP_DIR_LEDGER); + + let (mut validator, payer, count) = write(&ledger_path); + validator.kill().unwrap(); + + let mut validator = read(&ledger_path, &payer, count); + validator.kill().unwrap(); +} + +fn write(ledger_path: &Path) -> (Child, Keypair, u64) { + let (_, mut validator, ctx) = setup_validator_with_local_remote( + ledger_path, + None, + true, + false, + &LoadedAccounts::with_delegation_program_test_authority(), + ); + + let (payer, _) = + init_and_delegate_counter_and_payer(&ctx, &mut validator, "COUNTER"); + + // Schedule a task + let task_id = 1; + let execution_interval_millis = 50; + let iterations = 1000; + let ix = create_schedule_task_ix( + payer.pubkey(), + task_id, + execution_interval_millis, + iterations, + false, + false, + ); + confirm_tx_with_payer_ephem(ix, &payer, &ctx, &mut validator); + + // Wait for the task to be scheduled and executed + expect!(ctx.wait_for_delta_slot_ephem(3), validator); + + // Check that the counter was incremented + let counter_account = + fetch_counter_ephem(&ctx, &payer.pubkey(), &mut validator); + assert!( + counter_account.count > 0, + cleanup(&mut validator), + "counter.count: {}", + counter_account.count + ); + + // Wait more to be sure the ledger is persisted + wait_for_ledger_persist(&ctx, &mut validator); + debug!("✅ Ledger persisted"); + + (validator, payer, counter_account.count) +} + +fn read(ledger_path: &Path, kp: &Keypair, count: u64) -> Child { + let (_, mut validator, ctx) = + setup_validator_with_local_remote_and_resume_strategy( + ledger_path, + None, + false, + false, + &LoadedAccounts::with_delegation_program_test_authority(), + ); + + // Check that the counter persisted its value + let current_count = + fetch_counter_ephem(&ctx, &kp.pubkey(), &mut validator).count; + assert!( + current_count >= count, + cleanup(&mut validator), + "counter.count: {} < {}", + current_count, + count + ); + + // Wait for the crank to execute + expect!(ctx.wait_for_delta_slot_ephem(10), validator); + + // Check that the count increased + let new_count = + fetch_counter_ephem(&ctx, &kp.pubkey(), &mut validator).count; + assert!( + new_count > current_count, + cleanup(&mut validator), + "counter.count: {} <= {}", + new_count, + current_count + ); + + validator +}