feat: batch cranks#1190
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughThis PR refactors TaskSchedulerService to batch expired tasks into crank batches, send per-task transactions concurrently using shared Arc and Arc, and apply all outcomes in a single DB transaction via SchedulerDatabase::apply_crank_batch_completion. DbTask now includes updated_at and ownership was changed to avoid instruction cloning. SQLite PRAGMAs are set at DB init. TaskSchedulerError gains SizeMismatch. Tests and documentation were updated; Cargo.toml bumps solana-pubkey → 4.1. Assessment against linked issues
Assessment against linked issues: Out-of-scope changes
Suggested reviewers
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Manual Deploy AvailableYou can trigger a manual deploy of this PR branch to testnet: Alternative: Comment
Comment updated automatically when the PR is synchronized. |
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@magicblock-task-scheduler/README.md`:
- Around line 38-39: The README claims crank sends are semaphore-bounded but the
scheduler actually fans out sends unrestricted; update either the docs or the
code: either change the README wording to remove "bounded" or implement a
semaphore cap in the scheduler where crank sends are dispatched (the code paths
around send_transaction and the RpcSendTransactionConfig usage in the task
scheduler/crank dispatch routine) so that the send loop respects a semaphore
permit limit from RpcSendTransactionConfig before calling send_transaction,
ensuring concurrent sends are limited.
In `@magicblock-task-scheduler/src/service.rs`:
- Around line 276-299: The current on_crank_batch_completed collapses all
per-task results into one Result causing successful tasks to be retried when any
single task fails; change its input to accept per-task results (e.g.
Vec<(DbTask, TaskSchedulerResult<Signature>)>) and iterate over each tuple: call
apply_successful_execution(task).await? for Ok(signature) entries and call
handle_failed_task_execution(task, &err).await? for Err(err) entries. Update the
on_crank_batch_completed signature and all callers to produce per-task results
(rather than a single TaskSchedulerResult<Vec<Signature>>), and keep using the
existing DbTask, TaskSchedulerResult, Signature, apply_successful_execution and
handle_failed_task_execution symbols to implement per-task handling.
- Around line 477-497: The loop currently spawns an unbounded send_transaction
future per task (inside join_set.spawn) which can flood RPC; limit concurrency
by introducing a bounded semaphore (tokio::sync::Semaphore) or use a buffered
executor (futures::stream::StreamExt::buffer_unordered) so only N concurrent
sends run. Acquire a permit before calling join_set.spawn (or before awaiting
each buffered future) and let the permit drop after the send finishes; also wrap
the rpc_client.send_transaction call with a per-call timeout
(tokio::time::timeout) and propagate or log timeout errors. Update the block
that builds ixs/tx and the async closure around rpc_client.send_transaction
(referenced as rpc_client, tx_counter, join_set.spawn, InstructionUtils::*,
Transaction::new) to use the semaphore/timeout pattern so bursts are bounded.
- Around line 353-360: The loop double-polls the delay queue (calling
self.task_queue.poll_expired(&mut cx) in the while condition and again inside
the match), which discards the first expired entry; change the loop to poll
exactly once per iteration by binding the result from poll_expired and using
that bound value (the expired variable) for processing: call
self.task_queue.poll_expired(&mut cx) once per iteration, call
expired.into_inner() to get task, then remove its id from task_queue_keys and
push it into batch; alternatively replace the while-let with a loop that matches
self.task_queue.poll_expired(&mut cx) and handles Poll::Ready(Some(expired)),
Poll::Ready(None) and Poll::Pending accordingly so no expired entry is dropped.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: 5be1b3a8-33d5-4c0a-82f4-7011633acd4c
⛔ Files ignored due to path filters (2)
Cargo.lockis excluded by!**/*.locktest-integration/Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (4)
Cargo.tomldocs/task-scheduler.mdmagicblock-task-scheduler/README.mdmagicblock-task-scheduler/src/service.rs
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (2)
magicblock-task-scheduler/src/service.rs (2)
276-299:⚠️ Potential issue | 🔴 Critical | 🏗️ Heavy liftPer-task vs batch error handling still collapses partial successes.
send_crank_batchreturns a singleTaskSchedulerResult<Vec<Signature>>(Line 494 short-circuits on the first error viacollect::<Result<Vec<_>, _>>()), so theErrarm here invokeshandle_failed_task_executionfor every task inbatch— including the ones whosesend_transactionalready succeeded on-chain. That schedules retries / failure moves for already-executed tasks, producing duplicate executions and inconsistent DB state.Return per-task outcomes from
send_crank_batch(e.g.Vec<(DbTask, TaskSchedulerResult<Signature>)>) and dispatchapply_successful_execution/handle_failed_task_executionper entry instead of per batch.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@magicblock-task-scheduler/src/service.rs` around lines 276 - 299, The current on_crank_batch_completed matches on a single TaskSchedulerResult<Vec<Signature>> from send_crank_batch and treats the whole batch as failed if send_crank_batch returns Err, which collapses partial successes; change send_crank_batch to return per-task results (e.g. Vec<(DbTask, TaskSchedulerResult<Signature>)> or Vec<TaskResult>) and update on_crank_batch_completed to iterate the per-task outcomes and call apply_successful_execution(task).await? for Ok(signature) entries and handle_failed_task_execution(task, &err).await? for Err(err) entries so only genuinely failed tasks are retried/marked failed while successful tasks are applied once. Ensure types and call sites that use send_crank_batch and TaskSchedulerResult are updated accordingly.
464-496:⚠️ Potential issue | 🟠 Major | 🏗️ Heavy liftUnbounded RPC fan-out and no per-send timeout still present.
The loop spawns one
send_transactionfuture per task intoJoinSetwith no concurrency cap, andrpc_client.send_transaction(&tx).await(Line 490) has no timeout. A large burst of due tasks can flood the RPC endpoint, and any slow/stuck send blocks the batch'sjoin_all(Line 493) indefinitely, in turn stallingon_crank_batch_completedfor the whole batch. Combined with detachedtokio::spawns inrun, concurrency across in-flight batches is also unbounded.Please add a bounded
Semaphore(orfutures::stream::buffer_unordered) and wrap eachsend_transactionintokio::time::timeoutso blast radius is contained.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@magicblock-task-scheduler/src/service.rs` around lines 464 - 496, The send_crank_batch function currently spawns an unbounded number of send_transaction futures into the JoinSet and awaits them with no per-send timeout; change it to limit concurrency (e.g. create a Semaphore or use futures::stream::iter(tasks).map(|t| ...).buffer_unordered(CONCURRENCY)) and acquire a permit for each send so only N RPC calls run concurrently, and wrap each rpc_client.send_transaction(&tx).await call in tokio::time::timeout(DURATION, ...); convert timeout expiry into a TaskSchedulerResult error (or map it to a recoverable error) so join_all/buffer_unordered does not hang forever; update references inside send_crank_batch (the JoinSet spawn/collect logic and the send_transaction call) to use the bounded concurrency + timeout approach.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@magicblock-task-scheduler/src/service.rs`:
- Around line 359-368: The spawned batch-send tasks in run() currently use
tokio::spawn and drop the JoinHandle so they remain running after shutdown;
modify the spawn logic to respect self.token by either (A) collect and manage
JoinHandles with a JoinSet/TaskTracker inside the struct and await/join them on
shutdown in run(), or (B) pass a clone of self.token into the async move and
wrap send_crank_batch/send_transaction with a tokio::select! that aborts the
send when token.cancelled() fires; ensure you reference and update the call site
that invokes send_crank_batch(rpc_client, &block, tx_counter, &batch).await as
well as the crank_tx send so tasks are cancelled and cleaned up when
token.cancelled() triggers.
---
Duplicate comments:
In `@magicblock-task-scheduler/src/service.rs`:
- Around line 276-299: The current on_crank_batch_completed matches on a single
TaskSchedulerResult<Vec<Signature>> from send_crank_batch and treats the whole
batch as failed if send_crank_batch returns Err, which collapses partial
successes; change send_crank_batch to return per-task results (e.g. Vec<(DbTask,
TaskSchedulerResult<Signature>)> or Vec<TaskResult>) and update
on_crank_batch_completed to iterate the per-task outcomes and call
apply_successful_execution(task).await? for Ok(signature) entries and
handle_failed_task_execution(task, &err).await? for Err(err) entries so only
genuinely failed tasks are retried/marked failed while successful tasks are
applied once. Ensure types and call sites that use send_crank_batch and
TaskSchedulerResult are updated accordingly.
- Around line 464-496: The send_crank_batch function currently spawns an
unbounded number of send_transaction futures into the JoinSet and awaits them
with no per-send timeout; change it to limit concurrency (e.g. create a
Semaphore or use futures::stream::iter(tasks).map(|t|
...).buffer_unordered(CONCURRENCY)) and acquire a permit for each send so only N
RPC calls run concurrently, and wrap each rpc_client.send_transaction(&tx).await
call in tokio::time::timeout(DURATION, ...); convert timeout expiry into a
TaskSchedulerResult error (or map it to a recoverable error) so
join_all/buffer_unordered does not hang forever; update references inside
send_crank_batch (the JoinSet spawn/collect logic and the send_transaction call)
to use the bounded concurrency + timeout approach.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: ad2ff50a-2aaa-43ac-8fa3-760b7276ae91
📒 Files selected for processing (1)
magicblock-task-scheduler/src/service.rs
68df949 to
e2c1cef
Compare
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@magicblock-task-scheduler/src/service.rs`:
- Around line 367-418: on_crank_batch_completed currently mutates the in-memory
task_queue via apply_crank_success_outcome/apply_crank_failure_outcome before
persisting results; update it to guard against stale instances by (1) including
an instance identifier check (generation or updated_at) when preparing DB
updates and validating the DB write, and (2) deferring any changes to the
in-memory task_queue until after db.apply_crank_batch_completion completes and
confirms the same instance was modified. Specifically: gather success_updates,
success_removals, and failed_records as you do now, but have the DB
return/confirm the instance version(s) it actually touched (or return
affected_ids+versions); then, after await db.apply_crank_batch_completion(...),
iterate the batch and call
apply_crank_success_outcome/apply_crank_failure_outcome only for tasks whose
in-memory generation/updated_at matches the confirmed version from the DB (use
on_crank_batch_completed, apply_crank_success_outcome,
apply_crank_failure_outcome, db.apply_crank_batch_completion and task_queue to
locate and implement these checks).
- Around line 335-362: The spawned futures must return the originating DbTask
along with their TaskSchedulerResult so results are correlated by identity, not
completion order: change the JoinSet spawn closure to return (task.clone(),
TaskSchedulerResult<Signature>) (i.e. Ok((task,
rpc_client.send_transaction(&tx).await.map_err(Box::new)?))) instead of just the
Signature result, then update the join_all handling in
on_crank_batch_completed() to collect Vec<(DbTask,
TaskSchedulerResult<Signature>)> and iterate those tuples to call
apply_crank_success_outcome() or apply_crank_failure_outcome() for the exact
DbTask returned; keep using JoinSet::join_all() but do not zip with the original
batch slice. Ensure types (JoinSet< TaskSchedulerResult... > → JoinSet< (DbTask,
TaskSchedulerResult<Signature>) >) and downstream match arms are updated
accordingly.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: 8bf1ba06-e574-4f33-96ff-49f8a702a8d4
⛔ Files ignored due to path filters (2)
Cargo.lockis excluded by!**/*.locktest-integration/Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (6)
Cargo.tomldocs/task-scheduler.mdmagicblock-task-scheduler/README.mdmagicblock-task-scheduler/src/db.rsmagicblock-task-scheduler/src/errors.rsmagicblock-task-scheduler/src/service.rs
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (1)
magicblock-task-scheduler/src/service.rs (1)
336-374:⚠️ Potential issue | 🟠 Major | ⚡ Quick winUnbounded concurrent RPC sends; no per-send timeout.
send_crank_batchstill spawns onesend_transactionfuture per task with no concurrency cap, and the upstream spawner at Line 193 does not bound in-flight batches. A burst of expired tasks (or many parallel batches) fans out into an unbounded number of concurrent RPC calls, which can overload the scheduler and the RPC endpoint. Compounding this,rpc_client.send_transaction(&tx).await(Lines 365–369) has notokio::time::timeout: a hung RPC keepsjoin_set.join_all()blocked forever, so the batch result is never delivered tocrank_rx, the task’s entry intask_versionsstays alive indefinitely, and the cancellation token cannot unwind these in-flight sends.Add a bounded
Semaphore(acquired before eachjoin_set.spawn) and wrap eachsend_transactionin atokio::time::timeout, mapping the elapsed case to a retryable error so the existing failure path can recover.🛡️ Sketch of the fix
async fn send_crank_batch( rpc_client: Arc<RpcClient>, block: &LatestBlock, tx_counter: Arc<AtomicU64>, tasks: &[DbTask], ) -> TaskSchedulerResult<Vec<(DbTask, TaskSchedulerResult<Signature>)>> { let mut join_set: JoinSet<(DbTask, TaskSchedulerResult<Signature>)> = JoinSet::new(); let blockhash = block.load().blockhash; + let semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_CRANK_SENDS)); for task in tasks { let rpc_client = rpc_client.clone(); let tx_counter = tx_counter.clone(); + let permit = semaphore.clone().acquire_owned().await + .expect("semaphore not closed"); let task = task.clone(); join_set.spawn(async move { + let _permit = permit; let ixs = vec![ /* ... */ ]; let tx = Transaction::new(/* ... */); - let res = rpc_client - .send_transaction(&tx) - .await - .map_err(Box::new) - .map_err(TaskSchedulerError::from); + let res = match tokio::time::timeout( + CRANK_SEND_TIMEOUT, + rpc_client.send_transaction(&tx), + ) + .await + { + Ok(r) => r.map_err(Box::new).map_err(TaskSchedulerError::from), + Err(_) => Err(TaskSchedulerError::Rpc(/* timed-out variant */)), + }; (task, res) }); } Ok(join_set.join_all().await) }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@magicblock-task-scheduler/src/service.rs` around lines 336 - 374, send_crank_batch currently spawns unbounded send_transaction futures and awaits join_all without a per-send timeout; fix by introducing a bounded tokio::sync::Semaphore (e.g., create Arc<Semaphore> with desired concurrency limit inside send_crank_batch or accept it as a parameter), acquire a permit before calling join_set.spawn for each task (so the permit guard is moved into the spawned task and released when the task completes), and wrap the rpc_client.send_transaction(&tx).await call in tokio::time::timeout(...) mapping a timeout error into the existing retryable TaskSchedulerError path (so the timeout becomes a TaskSchedulerResult::Err that existing retry logic can handle); keep references to identifiers send_crank_batch, JoinSet, rpc_client.send_transaction, tx_counter, and ensure the semaphore permit is held while the send and its timeout run.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@magicblock-task-scheduler/src/db.rs`:
- Around line 50-82: Add documentation and clearer types for the public
crank-batch API: document CrankSuccessUpdate, CrankSuccessRemoval,
CrankFailedMove, CrankRetryCheck, and CrankBatchCompletion to explain the
semantics of each field (e.g. which fields are the optimistic
"expected_updated_at" token vs which are actual DB "updated_at" values). Replace
the ambiguous tuple in CrankBatchCompletion::success_updates (HashMap<i64, (i64,
i64)>) with a small named struct or type alias (e.g. SuccessUpdate {
expected_updated_at: i64, new_updated_at: i64 }) and similarly introduce
distinct named types or aliases for the HashMap value types used by
success_removals, failed_moves and retry_checks so callers of
CrankBatchCompletion (e.g. service.rs) can clearly see which maps hold expected
tokens vs actual DB timestamps; update the doc comments to state the meaning of
each map entry.
---
Duplicate comments:
In `@magicblock-task-scheduler/src/service.rs`:
- Around line 336-374: send_crank_batch currently spawns unbounded
send_transaction futures and awaits join_all without a per-send timeout; fix by
introducing a bounded tokio::sync::Semaphore (e.g., create Arc<Semaphore> with
desired concurrency limit inside send_crank_batch or accept it as a parameter),
acquire a permit before calling join_set.spawn for each task (so the permit
guard is moved into the spawned task and released when the task completes), and
wrap the rpc_client.send_transaction(&tx).await call in
tokio::time::timeout(...) mapping a timeout error into the existing retryable
TaskSchedulerError path (so the timeout becomes a TaskSchedulerResult::Err that
existing retry logic can handle); keep references to identifiers
send_crank_batch, JoinSet, rpc_client.send_transaction, tx_counter, and ensure
the semaphore permit is held while the send and its timeout run.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: c1ce4dd4-e4f0-4a53-9727-75cfdf7a996f
📒 Files selected for processing (2)
magicblock-task-scheduler/src/db.rsmagicblock-task-scheduler/src/service.rs
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
magicblock-task-scheduler/src/service.rs (1)
789-803:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winThis test passes even if nothing is scheduled.
The loop only errors when
tasks.len() > 1, then the assertion isunwrap_err()on the timeout. If both requests are ignored and the DB stays empty, the test still succeeds. It should wait forlen() == 1instead.Suggested fix
- timeout(Duration::from_secs(1), async move { - loop { - let tasks = db.get_tasks().await.unwrap(); - if tasks.len() > 1 { - return Err::<(), String>(format!( - "Tasks should be 1, got {}", - tasks.len() - )); - } - tokio::time::sleep(Duration::from_millis(100)).await; - } - }) - .await - .unwrap_err(); + timeout(Duration::from_secs(1), async move { + loop { + let tasks = db.get_tasks().await.unwrap(); + if tasks.len() == 1 { + return Ok::<(), String>(()); + } + if tasks.len() > 1 { + return Err(format!("Tasks should be 1, got {}", tasks.len())); + } + tokio::time::sleep(Duration::from_millis(100)).await; + } + }) + .await + .unwrap() + .unwrap();🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@magicblock-task-scheduler/src/service.rs` around lines 789 - 803, The test currently only errors when tasks.len() > 1 and then calls .await.unwrap_err(), so it also passes if the DB stays empty; change the loop that polls db.get_tasks().await.unwrap() to wait for tasks.len() == 1: inside the loop, if tasks.len() == 1 return Ok(()), if tasks.len() > 1 return Err(format!("Tasks should be 1, got {}", tasks.len())), otherwise sleep and continue; then call timeout(...).await.unwrap() (not unwrap_err()) so the timeout will fail if the expected single task never appears; references: timeout, db.get_tasks, tasks.len, tokio::time::sleep, unwrap_err/unwrap.
♻️ Duplicate comments (2)
magicblock-task-scheduler/src/service.rs (2)
347-376:⚠️ Potential issue | 🟠 Major | 🏗️ Heavy liftAdd backpressure and a timeout around per-task RPC sends.
This still spawns one RPC send per task with no concurrency cap, then waits on
join_all()for the slowest one. A large expiry burst can flood the sharedRpcClient, and one hungsend_transactionstalls completion handling for the entire batch.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@magicblock-task-scheduler/src/service.rs` around lines 347 - 376, The loop currently spawns an unbounded JoinSet of per-task RPC sends (join_set.spawn) which can flood rpc_client and block all results; modify to apply concurrency backpressure and a per-RPC timeout: create a tokio::sync::Semaphore with a configured max_concurrency and acquire a permit before spawning each task (release the permit when the task finishes), and wrap rpc_client.send_transaction(&tx).await in tokio::time::timeout(duration, ...), mapping a timeout error into an appropriate TaskSchedulerError variant so the task returns (task, Err(...)) instead of hanging; keep the existing tx_counter.fetch_add usage and preserve the (DbTask, TaskSchedulerResult<Signature>) return shape so join_set.join_all().await still works.
194-198:⚠️ Potential issue | 🔴 Critical | 🏗️ Heavy liftDon't let in-flight batches outlive shutdown.
Lines 194-198 detach the batch sender, and Line 218 breaks the loop immediately on cancellation. If shutdown happens after
send_transactionhas been submitted but beforecrank_tx.send(...)succeeds, the on-chain side effect can still land aftercrank_rxis dropped, so no completion is persisted and the same task can be replayed on restart.Also applies to: 218-225
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@magicblock-task-scheduler/src/service.rs` around lines 194 - 198, The spawned task detaches Self::send_crank_batch so an on-chain transaction may succeed after crank_rx is dropped and its completion never persisted; instead of fire-and-forget, capture the tokio::spawn JoinHandle (or push futures into a FuturesUnordered) when you call tokio::spawn for send_crank_batch and ensure you await or properly abort those handles during shutdown so crank_tx.send((batch, result)) runs while the receiver (crank_rx) still exists; update the code paths around the tokio::spawn call and the shutdown/loop-break logic (the block that currently breaks on cancellation) to wait for those JoinHandles to complete before returning.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@magicblock-task-scheduler/README.md`:
- Line 40: The README mentions the wrong internal API name; update the reference
from apply_crank_completion_batch to the correct symbol
apply_crank_batch_completion so the docs match the implementation and avoid
drift (search for the string apply_crank_completion_batch and replace it with
apply_crank_batch_completion).
---
Outside diff comments:
In `@magicblock-task-scheduler/src/service.rs`:
- Around line 789-803: The test currently only errors when tasks.len() > 1 and
then calls .await.unwrap_err(), so it also passes if the DB stays empty; change
the loop that polls db.get_tasks().await.unwrap() to wait for tasks.len() == 1:
inside the loop, if tasks.len() == 1 return Ok(()), if tasks.len() > 1 return
Err(format!("Tasks should be 1, got {}", tasks.len())), otherwise sleep and
continue; then call timeout(...).await.unwrap() (not unwrap_err()) so the
timeout will fail if the expected single task never appears; references:
timeout, db.get_tasks, tasks.len, tokio::time::sleep, unwrap_err/unwrap.
---
Duplicate comments:
In `@magicblock-task-scheduler/src/service.rs`:
- Around line 347-376: The loop currently spawns an unbounded JoinSet of
per-task RPC sends (join_set.spawn) which can flood rpc_client and block all
results; modify to apply concurrency backpressure and a per-RPC timeout: create
a tokio::sync::Semaphore with a configured max_concurrency and acquire a permit
before spawning each task (release the permit when the task finishes), and wrap
rpc_client.send_transaction(&tx).await in tokio::time::timeout(duration, ...),
mapping a timeout error into an appropriate TaskSchedulerError variant so the
task returns (task, Err(...)) instead of hanging; keep the existing
tx_counter.fetch_add usage and preserve the (DbTask,
TaskSchedulerResult<Signature>) return shape so join_set.join_all().await still
works.
- Around line 194-198: The spawned task detaches Self::send_crank_batch so an
on-chain transaction may succeed after crank_rx is dropped and its completion
never persisted; instead of fire-and-forget, capture the tokio::spawn JoinHandle
(or push futures into a FuturesUnordered) when you call tokio::spawn for
send_crank_batch and ensure you await or properly abort those handles during
shutdown so crank_tx.send((batch, result)) runs while the receiver (crank_rx)
still exists; update the code paths around the tokio::spawn call and the
shutdown/loop-break logic (the block that currently breaks on cancellation) to
wait for those JoinHandles to complete before returning.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: ae1ac4be-e1a2-44a4-8651-dfe2e04b5603
📒 Files selected for processing (3)
docs/task-scheduler.mdmagicblock-task-scheduler/README.mdmagicblock-task-scheduler/src/service.rs
taco-paco
left a comment
There was a problem hiding this comment.
LGTM! left a small comment
|
|
||
| info!("TaskSchedulerService shutdown!"); | ||
| for _ in 0..inflight_batches { | ||
| let Some((batch, result)) = crank_rx.recv().await else { |
There was a problem hiding this comment.
I think that would do a job without needing new inflight_batches variable unless I'm missing something
| let Some((batch, result)) = crank_rx.recv().await else { | |
| while let Some((batch, result)) = crank_rx.recv().await { |
Closes #1189
Summary
Up to 3x speedup for cranks, especially underload
Breaking Changes
Summary by CodeRabbit
Documentation
Dependencies
Improvements / Refactor