Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion magicblock-api/src/magic_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{
Arc,
},
thread,
time::Instant,
time::{Duration, Instant},
};

use magicblock_account_cloner::{
Expand Down Expand Up @@ -335,6 +335,7 @@ impl MagicValidator {
.take()
.expect("tasks_service should be initialized"),
ledger.latest_block().clone(),
Duration::from_millis(config.ledger.block_time_ms()),
token.clone(),
)?;
log_timing("startup", "task_scheduler_init", step_start);
Expand Down
20 changes: 18 additions & 2 deletions magicblock-task-scheduler/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ pub struct TaskSchedulerService {
token: CancellationToken,
/// Minimum interval between task executions
min_interval: Duration,
/// Slot interval of the validator
slot_interval: Duration,
}

enum ProcessingOutcome {
Expand All @@ -69,6 +71,7 @@ impl TaskSchedulerService {
rpc_url: String,
scheduled_tasks: ScheduledTasksRx,
block: LatestBlock,
slot_interval: Duration,
token: CancellationToken,
) -> TaskSchedulerResult<Self> {
if config.reset {
Expand Down Expand Up @@ -96,12 +99,14 @@ impl TaskSchedulerService {
tx_counter: AtomicU64::default(),
token,
min_interval: config.min_interval,
slot_interval,
})
}

pub async fn start(
mut self,
) -> TaskSchedulerResult<JoinHandle<TaskSchedulerResult<()>>> {
// Reschedule all tasks that are due
let tasks = self.db.get_tasks().await?;
let now = chrono::Utc::now().timestamp_millis();
debug!(
Expand All @@ -124,8 +129,13 @@ impl TaskSchedulerService {

let next_execution =
task.last_execution_millis + task.execution_interval_millis;
// Earliest reschedule time is 2 slot interval.
// This avoids, scheduling before the first blockhash is produced on restart.
let timeout = Duration::from_millis(
next_execution.saturating_sub(now).max(0) as u64,
next_execution
.saturating_sub(now)
.max(2 * self.slot_interval.as_millis() as i64)
as u64,
);
let task_id = task.id;
let key = self.task_queue.insert(task, timeout);
Expand Down Expand Up @@ -364,7 +374,10 @@ fn is_valid_task_interval(interval: i64) -> bool {

#[cfg(test)]
mod tests {
use magicblock_program::args::ScheduleTaskRequest;
use magicblock_program::{
args::ScheduleTaskRequest,
validator::generate_validator_authority_if_needed,
};
use solana_pubkey::Pubkey;
use tokio::{sync::mpsc, time::timeout};

Expand All @@ -373,6 +386,7 @@ mod tests {
#[tokio::test]
async fn test_schedule_invalid_tasks() {
magicblock_core::logger::init_for_tests();
generate_validator_authority_if_needed();

let (tx, rx) = mpsc::unbounded_channel();
let db = SchedulerDatabase::new(":memory:").unwrap();
Expand All @@ -386,6 +400,7 @@ mod tests {
tx_counter: AtomicU64::default(),
token: CancellationToken::new(),
min_interval: Duration::from_millis(1000),
slot_interval: Duration::from_millis(1000),
scheduled_tasks: rx,
};

Expand Down Expand Up @@ -466,6 +481,7 @@ mod tests {
tx_counter: AtomicU64::default(),
token: CancellationToken::new(),
min_interval: Duration::from_millis(1000),
slot_interval: Duration::from_millis(1000),
scheduled_tasks: rx,
};

Expand Down
2 changes: 1 addition & 1 deletion test-integration/test-ledger-restore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ pub fn setup_validator_with_local_remote_and_resume_strategy(
accountsdb: accountsdb_config.clone(),
programs,
task_scheduler: TaskSchedulerConfig {
reset: true,
reset: reset_ledger,
..Default::default()
},
lifecycle: LifecycleMode::Ephemeral,
Expand Down
113 changes: 113 additions & 0 deletions test-integration/test-ledger-restore/tests/16_cranks_persists.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
use std::{path::Path, process::Child};

use cleanass::assert;
use integration_test_tools::{
expect, loaded_accounts::LoadedAccounts, tmpdir::resolve_tmp_dir,
validator::cleanup,
};
use program_flexi_counter::instruction::create_schedule_task_ix;
use solana_sdk::{signature::Keypair, signer::Signer};
use test_kit::init_logger;
use test_ledger_restore::{
confirm_tx_with_payer_ephem, fetch_counter_ephem,
init_and_delegate_counter_and_payer, setup_validator_with_local_remote,
setup_validator_with_local_remote_and_resume_strategy,
wait_for_ledger_persist, TMP_DIR_LEDGER,
};
use tracing::*;

#[test]
fn test_crank_persistence() {
init_logger!();
let (_tmpdir, ledger_path) = resolve_tmp_dir(TMP_DIR_LEDGER);

let (mut validator, payer, count) = write(&ledger_path);
validator.kill().unwrap();

let mut validator = read(&ledger_path, &payer, count);
validator.kill().unwrap();
}

fn write(ledger_path: &Path) -> (Child, Keypair, u64) {
let (_, mut validator, ctx) = setup_validator_with_local_remote(
ledger_path,
None,
true,
false,
&LoadedAccounts::with_delegation_program_test_authority(),
);

let (payer, _) =
init_and_delegate_counter_and_payer(&ctx, &mut validator, "COUNTER");

// Schedule a task
let task_id = 1;
let execution_interval_millis = 50;
let iterations = 1000;
let ix = create_schedule_task_ix(
payer.pubkey(),
task_id,
execution_interval_millis,
iterations,
false,
false,
);
confirm_tx_with_payer_ephem(ix, &payer, &ctx, &mut validator);

// Wait for the task to be scheduled and executed
expect!(ctx.wait_for_delta_slot_ephem(3), validator);

// Check that the counter was incremented
let counter_account =
fetch_counter_ephem(&ctx, &payer.pubkey(), &mut validator);
assert!(
counter_account.count > 0,
cleanup(&mut validator),
"counter.count: {}",
counter_account.count
);

// Wait more to be sure the ledger is persisted
wait_for_ledger_persist(&ctx, &mut validator);
debug!("✅ Ledger persisted");

(validator, payer, counter_account.count)
}

fn read(ledger_path: &Path, kp: &Keypair, count: u64) -> Child {
let (_, mut validator, ctx) =
setup_validator_with_local_remote_and_resume_strategy(
ledger_path,
None,
false,
false,
&LoadedAccounts::with_delegation_program_test_authority(),
);

// Check that the counter persisted its value
let current_count =
fetch_counter_ephem(&ctx, &kp.pubkey(), &mut validator).count;
assert!(
current_count >= count,
cleanup(&mut validator),
"counter.count: {} < {}",
current_count,
count
);

// Wait for the crank to execute
expect!(ctx.wait_for_delta_slot_ephem(10), validator);

// Check that the count increased
let new_count =
fetch_counter_ephem(&ctx, &kp.pubkey(), &mut validator).count;
assert!(
new_count > current_count,
cleanup(&mut validator),
"counter.count: {} <= {}",
new_count,
current_count
);

validator
}