Skip to content

refactor(consensus): Delete Engine Task Queue#2538

Draft
refcell wants to merge 14 commits into
rf/refactor/direct-engine-insertfrom
rf/refactor/delete-engine-task-queue
Draft

refactor(consensus): Delete Engine Task Queue#2538
refcell wants to merge 14 commits into
rf/refactor/direct-engine-insertfrom
rf/refactor/delete-engine-task-queue

Conversation

@refcell
Copy link
Copy Markdown
Contributor

@refcell refcell commented May 5, 2026

Summary

Delete the remaining engine priority queue, task enum, trait dispatch, enqueue, drain, and delegated/finalize wrapper structs. The engine processor now publishes direct-operation state changes without a pre-drain step, and Engine is a non-generic state owner with generic methods for the client-specific calls. The legacy dev queue-length RPC now reports zero without carrying queue state through the engine actor.

@refcell refcell added consensus Area: consensus stacked Meta: Stacked PR labels May 5, 2026
@refcell refcell self-assigned this May 5, 2026
@vercel
Copy link
Copy Markdown

vercel Bot commented May 5, 2026

The latest updates on your projects. Learn more about Vercel for GitHub.

1 Skipped Deployment
Project Deployment Actions Updated (UTC)
base Ignored Ignored Preview May 14, 2026 10:38pm

Request Review

Comment on lines +138 to +143
Self::QueueLengthReceiver(subscription) => {
let (_, queue_length_recv) = tokio::sync::watch::channel(0);
subscription
.send(queue_length_recv)
.map_err(|_| EngineQueriesError::OutputChannelClosed)
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sender (_) from watch::channel(0) is immediately dropped. When the dev RPC subscriber calls wait_for on this receiver (in dev.rs:80), it will get Err(RecvError) immediately because the sender is gone, causing the subscription loop to exit and log a "Subscription to engine queue size has been closed" warning on every subscription attempt.

If this is intentional stub behavior for a deprecated API, consider either:

  1. Documenting why the sender is intentionally dropped (e.g., a comment explaining the legacy stub), or
  2. Keeping the sender alive so the subscription hangs (returning 0 forever) rather than erroring out — which might be friendlier to existing callers expecting a live subscription.

refcell and others added 4 commits May 14, 2026 12:53
Move sequencer build and get-payload requests to direct Engine methods. Delete the GetPayloadTask wrapper and remove queued Build/GetPayload task variants while keeping insert and consolidation on the existing queue.

Co-authored-by: Codex <noreply@openai.com>
Forward direct build failures to callers, avoid inline temporary-error retry loops, and harden direct get-payload metrics and payload-version handling.

Co-authored-by: Codex <noreply@openai.com>
Merge identical temporary-severity match arms in SealTaskError.

Co-authored-by: Codex <noreply@openai.com>
Avoid unchanged direct payload state broadcasts and route direct get-payload failures through engine severity handling.

Co-authored-by: Codex <noreply@openai.com>
@refcell refcell force-pushed the rf/refactor/direct-engine-forkchoice-finalize branch 2 times, most recently from b24924e to e9568bc Compare May 14, 2026 18:47
Base automatically changed from rf/refactor/direct-engine-forkchoice-finalize to rf/refactor/direct-engine-insert May 14, 2026 19:18
refcell and others added 6 commits May 14, 2026 15:22
Move unsafe payload insertion into direct Engine methods and delete the InsertTask wrapper. Keep consolidation, delegated forkchoice, finalization, and seal on the existing queue.

Co-authored-by: Codex <noreply@openai.com>
Apply nightly rustfmt import grouping and collapse the insert acknowledgement send check for clippy.

Co-authored-by: Codex <noreply@openai.com>
Propagate local no-ack insert errors through the existing engine task severity handler. Avoid redundant V3/V4 insert payload clones and clarify the newPayload duration log field.

Co-authored-by: Codex <noreply@openai.com>
Reshape the unsafe-payload processor rstest table around a case struct so the generated test does not trip clippy's argument limit. Correct the service README's description of when the local insert acknowledgement is sent versus when the sequencer client waits on the unsafe-head watch channel.

Co-authored-by: Codex <noreply@openai.com>
Move safe-head consolidation onto direct Engine methods and route ProcessSafeL2SignalRequest through them. Delete the ConsolidateTask wrapper and remove unused queued seal/consolidate variants so the remaining task queue only covers delegated forkchoice and finalize.

Co-authored-by: Codex <noreply@openai.com>
Move delegated forkchoice and finalized-head updates onto direct Engine methods. Route the engine processor through those methods and remove the old task wrappers and queue variants.

Merge duplicated finalize test match arms, preserve stale finalized-head no-op behavior in the direct path, and keep derived docs/manifest updates from the rebased base branch.

Co-authored-by: Codex <noreply@openai.com>
@refcell refcell force-pushed the rf/refactor/direct-engine-insert branch from 0519c51 to 134b7c4 Compare May 14, 2026 19:33
refcell and others added 2 commits May 14, 2026 16:44
Co-authored-by: Codex <noreply@openai.com>
Delete the remaining engine priority queue, task enum, trait dispatch, enqueue, drain, and thin delegated/finalize wrappers. Make Engine a non-generic state owner with generic direct methods, and keep only a small severity helper plus processor-local operation error mapping for reset/flush/critical handling.

Co-authored-by: Codex <noreply@openai.com>
@refcell refcell force-pushed the rf/refactor/delete-engine-task-queue branch from 2dadbb3 to 6d83e9f Compare May 14, 2026 21:37
.await;

self.state_sender.send_replace(self.state);
Metrics::engine_task_count(Metrics::INSERT_TASK_LABEL).increment(1);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

engine_task_count is incremented unconditionally here (on both success and failure), but the metric is described as "Engine operations successfully executed" and retry_with_severity only increments it on success (line 294). This will inflate the success counter for failed local inserts.

Suggested change
Metrics::engine_task_count(Metrics::INSERT_TASK_LABEL).increment(1);
if result.is_ok() {
Metrics::engine_task_count(Metrics::INSERT_TASK_LABEL).increment(1);
}

refcell and others added 2 commits May 14, 2026 17:47
Move repeated EngineClient bounds into the existing where clauses so clippy no longer reports multiple bound locations.

Co-authored-by: Codex <noreply@openai.com>
Remove the remaining SealTask wrapper and fold started-payload sealing into direct Engine methods. Rename the private task_queue module to operations and update stale queue-era docs and metrics.

Co-authored-by: Codex <noreply@openai.com>
@github-actions
Copy link
Copy Markdown
Contributor

Review Summary

Clean refactor that removes the engine task queue abstraction in favor of direct operations on Engine. The Engine struct is now non-generic (no PhantomData<EngineClient_>), with generics pushed to individual methods — a nice simplification. The EngineTaskExt trait, SealTask, BuildAndSealError, EngineTaskErrors, and build_and_seal utility are removed; their logic is inlined as Engine methods (seal_started_payload_with_state, insert_sealed_payload_with_state, rebuild_safe_payload). The EngineOperationError enum is correctly moved to the service crate where it belongs (the engine crate no longer needs to know about the aggregate error type).

Issues (2 findings, both covered by existing inline comments)

  1. Metrics inflation in insert_local_unsafe_payload (operations/core.rs:218): engine_task_count is incremented unconditionally (success and failure), unlike retry_with_severity which only increments on success. This inflates the "operations successfully executed" counter on failed local inserts.

  2. Immediately-dropped watch sender in QueueLengthReceiver shim (query.rs:139): let (_, queue_length_recv) = watch::channel(0) drops the sender, causing any wait_for caller to receive Err(RecvError) immediately. The TaskQueueLength shim (returns 0) is fine, but the subscription variant will error out on each subscription attempt rather than hanging with the shim value.

No new findings beyond the existing inline comments.

@refcell refcell force-pushed the rf/refactor/direct-engine-insert branch from 5794d4b to 4038c2c Compare May 15, 2026 16:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

consensus Area: consensus stacked Meta: Stacked PR

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant