Skip to content

feat: batch cranks#1190

Open
Dodecahedr0x wants to merge 18 commits into
masterfrom
fix/crank-perfs
Open

feat: batch cranks#1190
Dodecahedr0x wants to merge 18 commits into
masterfrom
fix/crank-perfs

Conversation

@Dodecahedr0x
Copy link
Copy Markdown
Contributor

@Dodecahedr0x Dodecahedr0x commented May 12, 2026

Closes #1189

Summary

  • Batch crank executions and stop waiting for them to finish before processing more cranks.
  • Batch write to SQLite when a batch finishes instead of one write per execution
  • Fixes setting the next execution to account for wake-up delay

Up to 3x speedup for cranks, especially underload

Breaking Changes

  • None

Summary by CodeRabbit

  • Documentation

    • Updated task scheduler docs and README: switched to min-interval setting and expanded performance guidance for batching, concurrency, SQLite tuning, payload handling, and retry/backoff strategies.
  • Dependencies

    • Bumped solana-pubkey from 3.0 to 4.1.
  • Improvements / Refactor

    • Scheduler now batches and executes due tasks concurrently, applies batch results atomically, tracks per-task update timestamps to avoid stale changes, records failures for clearer retry logic, and shares RPC/counter resources for reduced cloning and improved throughput.

Review Change Stack

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 12, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

This PR refactors TaskSchedulerService to batch expired tasks into crank batches, send per-task transactions concurrently using shared Arc and Arc, and apply all outcomes in a single DB transaction via SchedulerDatabase::apply_crank_batch_completion. DbTask now includes updated_at and ownership was changed to avoid instruction cloning. SQLite PRAGMAs are set at DB init. TaskSchedulerError gains SizeMismatch. Tests and documentation were updated; Cargo.toml bumps solana-pubkey → 4.1.

Assessment against linked issues

Objective Addressed Explanation
Batch execution of tasks to improve crank throughput [#1189]

Assessment against linked issues: Out-of-scope changes

Code Change Explanation
solana-pubkey version bump 3.0 → 4.1 (Cargo.toml) Dependency version change is not required by the batching objective and is unrelated to batch execution behavior.
Docs: replace millis_per_tick -> min-interval and expanded performance guidance (docs/task-scheduler.md, magicblock-task-scheduler/README.md) Documentation/example edits are informational and not part of the implementation objective for batching.

Suggested reviewers

  • GabrielePicco
  • thlorenz
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch fix/crank-perfs

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions
Copy link
Copy Markdown

github-actions Bot commented May 12, 2026

Manual Deploy Available

You can trigger a manual deploy of this PR branch to testnet:

Deploy to Testnet 🚀

Alternative: Comment /deploy on this PR to trigger deployment directly.

⚠️ Note: Manual deploy requires authorization. Only authorized users can trigger deployments.

Comment updated automatically when the PR is synchronized.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@magicblock-task-scheduler/README.md`:
- Around line 38-39: The README claims crank sends are semaphore-bounded but the
scheduler actually fans out sends unrestricted; update either the docs or the
code: either change the README wording to remove "bounded" or implement a
semaphore cap in the scheduler where crank sends are dispatched (the code paths
around send_transaction and the RpcSendTransactionConfig usage in the task
scheduler/crank dispatch routine) so that the send loop respects a semaphore
permit limit from RpcSendTransactionConfig before calling send_transaction,
ensuring concurrent sends are limited.

In `@magicblock-task-scheduler/src/service.rs`:
- Around line 276-299: The current on_crank_batch_completed collapses all
per-task results into one Result causing successful tasks to be retried when any
single task fails; change its input to accept per-task results (e.g.
Vec<(DbTask, TaskSchedulerResult<Signature>)>) and iterate over each tuple: call
apply_successful_execution(task).await? for Ok(signature) entries and call
handle_failed_task_execution(task, &err).await? for Err(err) entries. Update the
on_crank_batch_completed signature and all callers to produce per-task results
(rather than a single TaskSchedulerResult<Vec<Signature>>), and keep using the
existing DbTask, TaskSchedulerResult, Signature, apply_successful_execution and
handle_failed_task_execution symbols to implement per-task handling.
- Around line 477-497: The loop currently spawns an unbounded send_transaction
future per task (inside join_set.spawn) which can flood RPC; limit concurrency
by introducing a bounded semaphore (tokio::sync::Semaphore) or use a buffered
executor (futures::stream::StreamExt::buffer_unordered) so only N concurrent
sends run. Acquire a permit before calling join_set.spawn (or before awaiting
each buffered future) and let the permit drop after the send finishes; also wrap
the rpc_client.send_transaction call with a per-call timeout
(tokio::time::timeout) and propagate or log timeout errors. Update the block
that builds ixs/tx and the async closure around rpc_client.send_transaction
(referenced as rpc_client, tx_counter, join_set.spawn, InstructionUtils::*,
Transaction::new) to use the semaphore/timeout pattern so bursts are bounded.
- Around line 353-360: The loop double-polls the delay queue (calling
self.task_queue.poll_expired(&mut cx) in the while condition and again inside
the match), which discards the first expired entry; change the loop to poll
exactly once per iteration by binding the result from poll_expired and using
that bound value (the expired variable) for processing: call
self.task_queue.poll_expired(&mut cx) once per iteration, call
expired.into_inner() to get task, then remove its id from task_queue_keys and
push it into batch; alternatively replace the while-let with a loop that matches
self.task_queue.poll_expired(&mut cx) and handles Poll::Ready(Some(expired)),
Poll::Ready(None) and Poll::Pending accordingly so no expired entry is dropped.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

Run ID: 5be1b3a8-33d5-4c0a-82f4-7011633acd4c

📥 Commits

Reviewing files that changed from the base of the PR and between 4fac49a and 12b286a.

⛔ Files ignored due to path filters (2)
  • Cargo.lock is excluded by !**/*.lock
  • test-integration/Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (4)
  • Cargo.toml
  • docs/task-scheduler.md
  • magicblock-task-scheduler/README.md
  • magicblock-task-scheduler/src/service.rs

Comment thread magicblock-task-scheduler/README.md Outdated
Comment thread magicblock-task-scheduler/src/service.rs
Comment thread magicblock-task-scheduler/src/service.rs Outdated
Comment thread magicblock-task-scheduler/src/service.rs Outdated
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (2)
magicblock-task-scheduler/src/service.rs (2)

276-299: ⚠️ Potential issue | 🔴 Critical | 🏗️ Heavy lift

Per-task vs batch error handling still collapses partial successes.

send_crank_batch returns a single TaskSchedulerResult<Vec<Signature>> (Line 494 short-circuits on the first error via collect::<Result<Vec<_>, _>>()), so the Err arm here invokes handle_failed_task_execution for every task in batch — including the ones whose send_transaction already succeeded on-chain. That schedules retries / failure moves for already-executed tasks, producing duplicate executions and inconsistent DB state.

Return per-task outcomes from send_crank_batch (e.g. Vec<(DbTask, TaskSchedulerResult<Signature>)>) and dispatch apply_successful_execution / handle_failed_task_execution per entry instead of per batch.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@magicblock-task-scheduler/src/service.rs` around lines 276 - 299, The current
on_crank_batch_completed matches on a single TaskSchedulerResult<Vec<Signature>>
from send_crank_batch and treats the whole batch as failed if send_crank_batch
returns Err, which collapses partial successes; change send_crank_batch to
return per-task results (e.g. Vec<(DbTask, TaskSchedulerResult<Signature>)> or
Vec<TaskResult>) and update on_crank_batch_completed to iterate the per-task
outcomes and call apply_successful_execution(task).await? for Ok(signature)
entries and handle_failed_task_execution(task, &err).await? for Err(err) entries
so only genuinely failed tasks are retried/marked failed while successful tasks
are applied once. Ensure types and call sites that use send_crank_batch and
TaskSchedulerResult are updated accordingly.

464-496: ⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Unbounded RPC fan-out and no per-send timeout still present.

The loop spawns one send_transaction future per task into JoinSet with no concurrency cap, and rpc_client.send_transaction(&tx).await (Line 490) has no timeout. A large burst of due tasks can flood the RPC endpoint, and any slow/stuck send blocks the batch's join_all (Line 493) indefinitely, in turn stalling on_crank_batch_completed for the whole batch. Combined with detached tokio::spawns in run, concurrency across in-flight batches is also unbounded.

Please add a bounded Semaphore (or futures::stream::buffer_unordered) and wrap each send_transaction in tokio::time::timeout so blast radius is contained.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@magicblock-task-scheduler/src/service.rs` around lines 464 - 496, The
send_crank_batch function currently spawns an unbounded number of
send_transaction futures into the JoinSet and awaits them with no per-send
timeout; change it to limit concurrency (e.g. create a Semaphore or use
futures::stream::iter(tasks).map(|t| ...).buffer_unordered(CONCURRENCY)) and
acquire a permit for each send so only N RPC calls run concurrently, and wrap
each rpc_client.send_transaction(&tx).await call in
tokio::time::timeout(DURATION, ...); convert timeout expiry into a
TaskSchedulerResult error (or map it to a recoverable error) so
join_all/buffer_unordered does not hang forever; update references inside
send_crank_batch (the JoinSet spawn/collect logic and the send_transaction call)
to use the bounded concurrency + timeout approach.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@magicblock-task-scheduler/src/service.rs`:
- Around line 359-368: The spawned batch-send tasks in run() currently use
tokio::spawn and drop the JoinHandle so they remain running after shutdown;
modify the spawn logic to respect self.token by either (A) collect and manage
JoinHandles with a JoinSet/TaskTracker inside the struct and await/join them on
shutdown in run(), or (B) pass a clone of self.token into the async move and
wrap send_crank_batch/send_transaction with a tokio::select! that aborts the
send when token.cancelled() fires; ensure you reference and update the call site
that invokes send_crank_batch(rpc_client, &block, tx_counter, &batch).await as
well as the crank_tx send so tasks are cancelled and cleaned up when
token.cancelled() triggers.

---

Duplicate comments:
In `@magicblock-task-scheduler/src/service.rs`:
- Around line 276-299: The current on_crank_batch_completed matches on a single
TaskSchedulerResult<Vec<Signature>> from send_crank_batch and treats the whole
batch as failed if send_crank_batch returns Err, which collapses partial
successes; change send_crank_batch to return per-task results (e.g. Vec<(DbTask,
TaskSchedulerResult<Signature>)> or Vec<TaskResult>) and update
on_crank_batch_completed to iterate the per-task outcomes and call
apply_successful_execution(task).await? for Ok(signature) entries and
handle_failed_task_execution(task, &err).await? for Err(err) entries so only
genuinely failed tasks are retried/marked failed while successful tasks are
applied once. Ensure types and call sites that use send_crank_batch and
TaskSchedulerResult are updated accordingly.
- Around line 464-496: The send_crank_batch function currently spawns an
unbounded number of send_transaction futures into the JoinSet and awaits them
with no per-send timeout; change it to limit concurrency (e.g. create a
Semaphore or use futures::stream::iter(tasks).map(|t|
...).buffer_unordered(CONCURRENCY)) and acquire a permit for each send so only N
RPC calls run concurrently, and wrap each rpc_client.send_transaction(&tx).await
call in tokio::time::timeout(DURATION, ...); convert timeout expiry into a
TaskSchedulerResult error (or map it to a recoverable error) so
join_all/buffer_unordered does not hang forever; update references inside
send_crank_batch (the JoinSet spawn/collect logic and the send_transaction call)
to use the bounded concurrency + timeout approach.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

Run ID: ad2ff50a-2aaa-43ac-8fa3-760b7276ae91

📥 Commits

Reviewing files that changed from the base of the PR and between 12b286a and 68df949.

📒 Files selected for processing (1)
  • magicblock-task-scheduler/src/service.rs

Comment thread magicblock-task-scheduler/src/service.rs Outdated
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@magicblock-task-scheduler/src/service.rs`:
- Around line 367-418: on_crank_batch_completed currently mutates the in-memory
task_queue via apply_crank_success_outcome/apply_crank_failure_outcome before
persisting results; update it to guard against stale instances by (1) including
an instance identifier check (generation or updated_at) when preparing DB
updates and validating the DB write, and (2) deferring any changes to the
in-memory task_queue until after db.apply_crank_batch_completion completes and
confirms the same instance was modified. Specifically: gather success_updates,
success_removals, and failed_records as you do now, but have the DB
return/confirm the instance version(s) it actually touched (or return
affected_ids+versions); then, after await db.apply_crank_batch_completion(...),
iterate the batch and call
apply_crank_success_outcome/apply_crank_failure_outcome only for tasks whose
in-memory generation/updated_at matches the confirmed version from the DB (use
on_crank_batch_completed, apply_crank_success_outcome,
apply_crank_failure_outcome, db.apply_crank_batch_completion and task_queue to
locate and implement these checks).
- Around line 335-362: The spawned futures must return the originating DbTask
along with their TaskSchedulerResult so results are correlated by identity, not
completion order: change the JoinSet spawn closure to return (task.clone(),
TaskSchedulerResult<Signature>) (i.e. Ok((task,
rpc_client.send_transaction(&tx).await.map_err(Box::new)?))) instead of just the
Signature result, then update the join_all handling in
on_crank_batch_completed() to collect Vec<(DbTask,
TaskSchedulerResult<Signature>)> and iterate those tuples to call
apply_crank_success_outcome() or apply_crank_failure_outcome() for the exact
DbTask returned; keep using JoinSet::join_all() but do not zip with the original
batch slice. Ensure types (JoinSet< TaskSchedulerResult... > → JoinSet< (DbTask,
TaskSchedulerResult<Signature>) >) and downstream match arms are updated
accordingly.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

Run ID: 8bf1ba06-e574-4f33-96ff-49f8a702a8d4

📥 Commits

Reviewing files that changed from the base of the PR and between 68df949 and 8722b70.

⛔ Files ignored due to path filters (2)
  • Cargo.lock is excluded by !**/*.lock
  • test-integration/Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (6)
  • Cargo.toml
  • docs/task-scheduler.md
  • magicblock-task-scheduler/README.md
  • magicblock-task-scheduler/src/db.rs
  • magicblock-task-scheduler/src/errors.rs
  • magicblock-task-scheduler/src/service.rs

Comment thread magicblock-task-scheduler/src/service.rs Outdated
Comment thread magicblock-task-scheduler/src/service.rs
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
magicblock-task-scheduler/src/service.rs (1)

336-374: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Unbounded concurrent RPC sends; no per-send timeout.

send_crank_batch still spawns one send_transaction future per task with no concurrency cap, and the upstream spawner at Line 193 does not bound in-flight batches. A burst of expired tasks (or many parallel batches) fans out into an unbounded number of concurrent RPC calls, which can overload the scheduler and the RPC endpoint. Compounding this, rpc_client.send_transaction(&tx).await (Lines 365–369) has no tokio::time::timeout: a hung RPC keeps join_set.join_all() blocked forever, so the batch result is never delivered to crank_rx, the task’s entry in task_versions stays alive indefinitely, and the cancellation token cannot unwind these in-flight sends.

Add a bounded Semaphore (acquired before each join_set.spawn) and wrap each send_transaction in a tokio::time::timeout, mapping the elapsed case to a retryable error so the existing failure path can recover.

🛡️ Sketch of the fix
 async fn send_crank_batch(
     rpc_client: Arc<RpcClient>,
     block: &LatestBlock,
     tx_counter: Arc<AtomicU64>,
     tasks: &[DbTask],
 ) -> TaskSchedulerResult<Vec<(DbTask, TaskSchedulerResult<Signature>)>>
 {
     let mut join_set: JoinSet<(DbTask, TaskSchedulerResult<Signature>)> =
         JoinSet::new();
     let blockhash = block.load().blockhash;
+    let semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_CRANK_SENDS));
     for task in tasks {
         let rpc_client = rpc_client.clone();
         let tx_counter = tx_counter.clone();
+        let permit = semaphore.clone().acquire_owned().await
+            .expect("semaphore not closed");
         let task = task.clone();
         join_set.spawn(async move {
+            let _permit = permit;
             let ixs = vec![ /* ... */ ];
             let tx = Transaction::new(/* ... */);
-            let res = rpc_client
-                .send_transaction(&tx)
-                .await
-                .map_err(Box::new)
-                .map_err(TaskSchedulerError::from);
+            let res = match tokio::time::timeout(
+                CRANK_SEND_TIMEOUT,
+                rpc_client.send_transaction(&tx),
+            )
+            .await
+            {
+                Ok(r) => r.map_err(Box::new).map_err(TaskSchedulerError::from),
+                Err(_) => Err(TaskSchedulerError::Rpc(/* timed-out variant */)),
+            };
             (task, res)
         });
     }
     Ok(join_set.join_all().await)
 }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@magicblock-task-scheduler/src/service.rs` around lines 336 - 374,
send_crank_batch currently spawns unbounded send_transaction futures and awaits
join_all without a per-send timeout; fix by introducing a bounded
tokio::sync::Semaphore (e.g., create Arc<Semaphore> with desired concurrency
limit inside send_crank_batch or accept it as a parameter), acquire a permit
before calling join_set.spawn for each task (so the permit guard is moved into
the spawned task and released when the task completes), and wrap the
rpc_client.send_transaction(&tx).await call in tokio::time::timeout(...) mapping
a timeout error into the existing retryable TaskSchedulerError path (so the
timeout becomes a TaskSchedulerResult::Err that existing retry logic can
handle); keep references to identifiers send_crank_batch, JoinSet,
rpc_client.send_transaction, tx_counter, and ensure the semaphore permit is held
while the send and its timeout run.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@magicblock-task-scheduler/src/db.rs`:
- Around line 50-82: Add documentation and clearer types for the public
crank-batch API: document CrankSuccessUpdate, CrankSuccessRemoval,
CrankFailedMove, CrankRetryCheck, and CrankBatchCompletion to explain the
semantics of each field (e.g. which fields are the optimistic
"expected_updated_at" token vs which are actual DB "updated_at" values). Replace
the ambiguous tuple in CrankBatchCompletion::success_updates (HashMap<i64, (i64,
i64)>) with a small named struct or type alias (e.g. SuccessUpdate {
expected_updated_at: i64, new_updated_at: i64 }) and similarly introduce
distinct named types or aliases for the HashMap value types used by
success_removals, failed_moves and retry_checks so callers of
CrankBatchCompletion (e.g. service.rs) can clearly see which maps hold expected
tokens vs actual DB timestamps; update the doc comments to state the meaning of
each map entry.

---

Duplicate comments:
In `@magicblock-task-scheduler/src/service.rs`:
- Around line 336-374: send_crank_batch currently spawns unbounded
send_transaction futures and awaits join_all without a per-send timeout; fix by
introducing a bounded tokio::sync::Semaphore (e.g., create Arc<Semaphore> with
desired concurrency limit inside send_crank_batch or accept it as a parameter),
acquire a permit before calling join_set.spawn for each task (so the permit
guard is moved into the spawned task and released when the task completes), and
wrap the rpc_client.send_transaction(&tx).await call in
tokio::time::timeout(...) mapping a timeout error into the existing retryable
TaskSchedulerError path (so the timeout becomes a TaskSchedulerResult::Err that
existing retry logic can handle); keep references to identifiers
send_crank_batch, JoinSet, rpc_client.send_transaction, tx_counter, and ensure
the semaphore permit is held while the send and its timeout run.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

Run ID: c1ce4dd4-e4f0-4a53-9727-75cfdf7a996f

📥 Commits

Reviewing files that changed from the base of the PR and between 8722b70 and e895ec0.

📒 Files selected for processing (2)
  • magicblock-task-scheduler/src/db.rs
  • magicblock-task-scheduler/src/service.rs

Comment thread magicblock-task-scheduler/src/db.rs
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
magicblock-task-scheduler/src/service.rs (1)

789-803: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

This test passes even if nothing is scheduled.

The loop only errors when tasks.len() > 1, then the assertion is unwrap_err() on the timeout. If both requests are ignored and the DB stays empty, the test still succeeds. It should wait for len() == 1 instead.

Suggested fix
-        timeout(Duration::from_secs(1), async move {
-            loop {
-                let tasks = db.get_tasks().await.unwrap();
-                if tasks.len() > 1 {
-                    return Err::<(), String>(format!(
-                        "Tasks should be 1, got {}",
-                        tasks.len()
-                    ));
-                }
-                tokio::time::sleep(Duration::from_millis(100)).await;
-            }
-        })
-        .await
-        .unwrap_err();
+        timeout(Duration::from_secs(1), async move {
+            loop {
+                let tasks = db.get_tasks().await.unwrap();
+                if tasks.len() == 1 {
+                    return Ok::<(), String>(());
+                }
+                if tasks.len() > 1 {
+                    return Err(format!("Tasks should be 1, got {}", tasks.len()));
+                }
+                tokio::time::sleep(Duration::from_millis(100)).await;
+            }
+        })
+        .await
+        .unwrap()
+        .unwrap();
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@magicblock-task-scheduler/src/service.rs` around lines 789 - 803, The test
currently only errors when tasks.len() > 1 and then calls .await.unwrap_err(),
so it also passes if the DB stays empty; change the loop that polls
db.get_tasks().await.unwrap() to wait for tasks.len() == 1: inside the loop, if
tasks.len() == 1 return Ok(()), if tasks.len() > 1 return Err(format!("Tasks
should be 1, got {}", tasks.len())), otherwise sleep and continue; then call
timeout(...).await.unwrap() (not unwrap_err()) so the timeout will fail if the
expected single task never appears; references: timeout, db.get_tasks,
tasks.len, tokio::time::sleep, unwrap_err/unwrap.
♻️ Duplicate comments (2)
magicblock-task-scheduler/src/service.rs (2)

347-376: ⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Add backpressure and a timeout around per-task RPC sends.

This still spawns one RPC send per task with no concurrency cap, then waits on join_all() for the slowest one. A large expiry burst can flood the shared RpcClient, and one hung send_transaction stalls completion handling for the entire batch.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@magicblock-task-scheduler/src/service.rs` around lines 347 - 376, The loop
currently spawns an unbounded JoinSet of per-task RPC sends (join_set.spawn)
which can flood rpc_client and block all results; modify to apply concurrency
backpressure and a per-RPC timeout: create a tokio::sync::Semaphore with a
configured max_concurrency and acquire a permit before spawning each task
(release the permit when the task finishes), and wrap
rpc_client.send_transaction(&tx).await in tokio::time::timeout(duration, ...),
mapping a timeout error into an appropriate TaskSchedulerError variant so the
task returns (task, Err(...)) instead of hanging; keep the existing
tx_counter.fetch_add usage and preserve the (DbTask,
TaskSchedulerResult<Signature>) return shape so join_set.join_all().await still
works.

194-198: ⚠️ Potential issue | 🔴 Critical | 🏗️ Heavy lift

Don't let in-flight batches outlive shutdown.

Lines 194-198 detach the batch sender, and Line 218 breaks the loop immediately on cancellation. If shutdown happens after send_transaction has been submitted but before crank_tx.send(...) succeeds, the on-chain side effect can still land after crank_rx is dropped, so no completion is persisted and the same task can be replayed on restart.

Also applies to: 218-225

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@magicblock-task-scheduler/src/service.rs` around lines 194 - 198, The spawned
task detaches Self::send_crank_batch so an on-chain transaction may succeed
after crank_rx is dropped and its completion never persisted; instead of
fire-and-forget, capture the tokio::spawn JoinHandle (or push futures into a
FuturesUnordered) when you call tokio::spawn for send_crank_batch and ensure you
await or properly abort those handles during shutdown so crank_tx.send((batch,
result)) runs while the receiver (crank_rx) still exists; update the code paths
around the tokio::spawn call and the shutdown/loop-break logic (the block that
currently breaks on cancellation) to wait for those JoinHandles to complete
before returning.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@magicblock-task-scheduler/README.md`:
- Line 40: The README mentions the wrong internal API name; update the reference
from apply_crank_completion_batch to the correct symbol
apply_crank_batch_completion so the docs match the implementation and avoid
drift (search for the string apply_crank_completion_batch and replace it with
apply_crank_batch_completion).

---

Outside diff comments:
In `@magicblock-task-scheduler/src/service.rs`:
- Around line 789-803: The test currently only errors when tasks.len() > 1 and
then calls .await.unwrap_err(), so it also passes if the DB stays empty; change
the loop that polls db.get_tasks().await.unwrap() to wait for tasks.len() == 1:
inside the loop, if tasks.len() == 1 return Ok(()), if tasks.len() > 1 return
Err(format!("Tasks should be 1, got {}", tasks.len())), otherwise sleep and
continue; then call timeout(...).await.unwrap() (not unwrap_err()) so the
timeout will fail if the expected single task never appears; references:
timeout, db.get_tasks, tasks.len, tokio::time::sleep, unwrap_err/unwrap.

---

Duplicate comments:
In `@magicblock-task-scheduler/src/service.rs`:
- Around line 347-376: The loop currently spawns an unbounded JoinSet of
per-task RPC sends (join_set.spawn) which can flood rpc_client and block all
results; modify to apply concurrency backpressure and a per-RPC timeout: create
a tokio::sync::Semaphore with a configured max_concurrency and acquire a permit
before spawning each task (release the permit when the task finishes), and wrap
rpc_client.send_transaction(&tx).await in tokio::time::timeout(duration, ...),
mapping a timeout error into an appropriate TaskSchedulerError variant so the
task returns (task, Err(...)) instead of hanging; keep the existing
tx_counter.fetch_add usage and preserve the (DbTask,
TaskSchedulerResult<Signature>) return shape so join_set.join_all().await still
works.
- Around line 194-198: The spawned task detaches Self::send_crank_batch so an
on-chain transaction may succeed after crank_rx is dropped and its completion
never persisted; instead of fire-and-forget, capture the tokio::spawn JoinHandle
(or push futures into a FuturesUnordered) when you call tokio::spawn for
send_crank_batch and ensure you await or properly abort those handles during
shutdown so crank_tx.send((batch, result)) runs while the receiver (crank_rx)
still exists; update the code paths around the tokio::spawn call and the
shutdown/loop-break logic (the block that currently breaks on cancellation) to
wait for those JoinHandles to complete before returning.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

Run ID: ae1ac4be-e1a2-44a4-8651-dfe2e04b5603

📥 Commits

Reviewing files that changed from the base of the PR and between e895ec0 and 5f1f014.

📒 Files selected for processing (3)
  • docs/task-scheduler.md
  • magicblock-task-scheduler/README.md
  • magicblock-task-scheduler/src/service.rs

Comment thread magicblock-task-scheduler/README.md Outdated
@Dodecahedr0x Dodecahedr0x marked this pull request as ready for review May 15, 2026 10:18
Comment thread magicblock-task-scheduler/src/service.rs Outdated
Comment thread magicblock-task-scheduler/src/service.rs
Copy link
Copy Markdown
Contributor

@taco-paco taco-paco left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! left a small comment


info!("TaskSchedulerService shutdown!");
for _ in 0..inflight_batches {
let Some((batch, result)) = crank_rx.recv().await else {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that would do a job without needing new inflight_batches variable unless I'm missing something

Suggested change
let Some((batch, result)) = crank_rx.recv().await else {
while let Some((batch, result)) = crank_rx.recv().await {

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

feat: enhance crank perfs

3 participants