From 488d37f6c6ee96db74bfc37f76ea43b2bffda249 Mon Sep 17 00:00:00 2001 From: stanlou Date: Wed, 25 Feb 2026 18:15:11 +0100 Subject: [PATCH] wip: inject task --- .../src/protocol/production/flow/BatchFlow.ts | 14 +++- .../src/protocol/production/flow/BlockFlow.ts | 14 +++- .../production/flow/StateTransitionFlow.ts | 13 +++- .../production/flow/TransactionFlow.ts | 11 ++- .../production/tasks/BlockReductionTask.ts | 2 + .../production/tasks/CircuitCompilerTask.ts | 2 + .../protocol/production/tasks/NewBlockTask.ts | 2 + .../production/tasks/RuntimeProvingTask.ts | 3 +- .../tasks/StateTransitionReductionTask.ts | 2 + .../production/tasks/StateTransitionTask.ts | 2 + .../tasks/TransactionProvingTask.ts | 2 + .../tasks/TransactionReductionTask.ts | 2 + .../src/sequencer/SequencerStartupModule.ts | 15 ++++ .../src/settlement/SettlementModule.ts | 4 ++ .../settlement/tasks/SettlementProvingTask.ts | 2 + .../worker/worker/LocalTaskWorkerModule.ts | 69 ++++++++++++------- .../worker/startup/WorkerRegistrationTask.ts | 2 + packages/stack/src/presets/modules/index.ts | 4 +- 18 files changed, 133 insertions(+), 32 deletions(-) diff --git a/packages/sequencer/src/protocol/production/flow/BatchFlow.ts b/packages/sequencer/src/protocol/production/flow/BatchFlow.ts index e2267ec60..6a9a67729 100644 --- a/packages/sequencer/src/protocol/production/flow/BatchFlow.ts +++ b/packages/sequencer/src/protocol/production/flow/BatchFlow.ts @@ -7,7 +7,7 @@ import { StateTransitionProverPublicOutput, TransactionProverPublicInput, } from "@proto-kit/protocol"; -import { isFull, mapSequential, Nullable } from "@proto-kit/common"; +import { isFull, mapSequential, Nullable, dependencyFactory, DependencyRecord } from "@proto-kit/common"; import { FlowCreator } from "../../../worker/flow/Flow"; import { NewBlockProvingParameters, NewBlockTask } from "../tasks/NewBlockTask"; @@ -22,6 +22,7 @@ import { BlockFlow } from "./BlockFlow"; @injectable() @scoped(Lifecycle.ContainerScoped) +@dependencyFactory() export class BatchFlow { public constructor( private readonly flowCreator: FlowCreator, @@ -35,6 +36,17 @@ export class BatchFlow { public readonly tracer: Tracer ) {} + public static dependencies(): DependencyRecord { + return { + blockProvingTask: { + useClass: NewBlockTask, + }, + blockReductionTask: { + useClass: BlockReductionTask, + }, + }; + } + private isBlockProofsMergable(a: BlockProof, b: BlockProof): boolean { // TODO Proper replication of merge logic return a.publicOutput.stateRoot diff --git a/packages/sequencer/src/protocol/production/flow/BlockFlow.ts b/packages/sequencer/src/protocol/production/flow/BlockFlow.ts index 874a4f03d..c7b2c97c6 100644 --- a/packages/sequencer/src/protocol/production/flow/BlockFlow.ts +++ b/packages/sequencer/src/protocol/production/flow/BlockFlow.ts @@ -4,7 +4,7 @@ import { Protocol, TransactionProof, } from "@proto-kit/protocol"; -import { mapSequential } from "@proto-kit/common"; +import { mapSequential, dependencyFactory, DependencyRecord } from "@proto-kit/common"; // eslint-disable-next-line import/no-extraneous-dependencies import chunk from "lodash/chunk"; @@ -19,6 +19,7 @@ import { TransactionFlow } from "./TransactionFlow"; // TODO Rename to TransactionFlow @injectable() @scoped(Lifecycle.ContainerScoped) +@dependencyFactory() export class BlockFlow { public constructor( private readonly flowCreator: FlowCreator, @@ -29,6 +30,17 @@ export class BlockFlow { private readonly transactionMergeTask: TransactionReductionTask ) {} + public static dependencies(): DependencyRecord { + return { + transactionTask: { + useClass: TransactionProvingTask, + }, + transactionMergeTask: { + useClass: TransactionReductionTask, + }, + }; + } + private dummyProof: TransactionProof | undefined = undefined; private async dummyTransactionProof() { diff --git a/packages/sequencer/src/protocol/production/flow/StateTransitionFlow.ts b/packages/sequencer/src/protocol/production/flow/StateTransitionFlow.ts index 6321920b0..e432bb52c 100644 --- a/packages/sequencer/src/protocol/production/flow/StateTransitionFlow.ts +++ b/packages/sequencer/src/protocol/production/flow/StateTransitionFlow.ts @@ -16,9 +16,11 @@ import { import { StateTransitionReductionTask } from "../tasks/StateTransitionReductionTask"; import { ReductionTaskFlow } from "./ReductionTaskFlow"; +import { dependencyFactory, DependencyRecord } from "@proto-kit/common"; @injectable() @scoped(Lifecycle.ContainerScoped) +@dependencyFactory() export class StateTransitionFlow { public constructor( @inject("Protocol") @@ -27,7 +29,16 @@ export class StateTransitionFlow { private readonly stateTransitionTask: StateTransitionTask, private readonly stateTransitionReductionTask: StateTransitionReductionTask ) {} - + public static dependencies(): DependencyRecord { + return { + stateTransitionTask: { + useClass: StateTransitionTask, + }, + stateTransitionReductionTask: { + useClass: StateTransitionReductionTask, + }, + }; + } private async dummySTProof(): Promise { const emptyInputOutput: StateTransitionProverPublicInput & StateTransitionProverPublicOutput = { diff --git a/packages/sequencer/src/protocol/production/flow/TransactionFlow.ts b/packages/sequencer/src/protocol/production/flow/TransactionFlow.ts index 5d968758a..850365caa 100644 --- a/packages/sequencer/src/protocol/production/flow/TransactionFlow.ts +++ b/packages/sequencer/src/protocol/production/flow/TransactionFlow.ts @@ -1,5 +1,5 @@ import { injectable } from "tsyringe"; -import { assertSizeOneOrTwo } from "@proto-kit/common"; +import { assertSizeOneOrTwo, dependencyFactory, DependencyRecord } from "@proto-kit/common"; import { Flow, FlowCreator } from "../../../worker/flow/Flow"; import { @@ -10,12 +10,21 @@ import { RuntimeProvingTask } from "../tasks/RuntimeProvingTask"; import { TransactionTrace } from "../tracing/TransactionTracingService"; @injectable() +@dependencyFactory() export class TransactionFlow { public constructor( private readonly flowCreator: FlowCreator, private readonly runtimeProvingTask: RuntimeProvingTask ) {} + public static dependencies(): DependencyRecord { + return { + runtimeProvingTask: { + useClass: RuntimeProvingTask, + }, + }; + } + private async resolveTransactionFlow( flow: Flow<{ runtimeProofs: { proof: RuntimeProof; index: number }[]; diff --git a/packages/sequencer/src/protocol/production/tasks/BlockReductionTask.ts b/packages/sequencer/src/protocol/production/tasks/BlockReductionTask.ts index 24b5423a0..1aa54e0e8 100644 --- a/packages/sequencer/src/protocol/production/tasks/BlockReductionTask.ts +++ b/packages/sequencer/src/protocol/production/tasks/BlockReductionTask.ts @@ -9,6 +9,7 @@ import { import { CompileRegistry, ProvableMethodExecutionContext, + implement, } from "@proto-kit/common"; import { TaskWorkerModule } from "../../../worker/worker/TaskWorkerModule"; @@ -21,6 +22,7 @@ import { @injectable() @scoped(Lifecycle.ContainerScoped) +@implement("Task") export class BlockReductionTask extends TaskWorkerModule implements Task, BlockProof> diff --git a/packages/sequencer/src/protocol/production/tasks/CircuitCompilerTask.ts b/packages/sequencer/src/protocol/production/tasks/CircuitCompilerTask.ts index 2e14928d8..149367027 100644 --- a/packages/sequencer/src/protocol/production/tasks/CircuitCompilerTask.ts +++ b/packages/sequencer/src/protocol/production/tasks/CircuitCompilerTask.ts @@ -9,6 +9,7 @@ import { CompilableModule, safeParseJson, reduceSequential, + implement, } from "@proto-kit/common"; import { MandatorySettlementModulesRecord, @@ -40,6 +41,7 @@ export type CompilerTaskParams = { @injectable() @scoped(Lifecycle.ContainerScoped) +@implement("Task") export class CircuitCompilerTask extends UnpreparingTask< CompilerTaskParams, ArtifactRecord diff --git a/packages/sequencer/src/protocol/production/tasks/NewBlockTask.ts b/packages/sequencer/src/protocol/production/tasks/NewBlockTask.ts index bac58cf3c..e1f615a56 100644 --- a/packages/sequencer/src/protocol/production/tasks/NewBlockTask.ts +++ b/packages/sequencer/src/protocol/production/tasks/NewBlockTask.ts @@ -1,4 +1,5 @@ import { inject, injectable, Lifecycle, scoped } from "tsyringe"; +import { implement } from "@proto-kit/common"; import { BlockProvable, BlockProverPublicInput, @@ -55,6 +56,7 @@ export type NewBlockProvingParameters = PairingDerivedInput< @injectable() @scoped(Lifecycle.ContainerScoped) +@implement("Task") export class NewBlockTask extends TaskWorkerModule implements Task diff --git a/packages/sequencer/src/protocol/production/tasks/RuntimeProvingTask.ts b/packages/sequencer/src/protocol/production/tasks/RuntimeProvingTask.ts index 3fe0756d3..a6e6dbd1b 100644 --- a/packages/sequencer/src/protocol/production/tasks/RuntimeProvingTask.ts +++ b/packages/sequencer/src/protocol/production/tasks/RuntimeProvingTask.ts @@ -10,7 +10,7 @@ import { RuntimeMethodExecutionContext, } from "@proto-kit/protocol"; import { Proof } from "o1js"; -import { CompileRegistry } from "@proto-kit/common"; +import { CompileRegistry, implement } from "@proto-kit/common"; import { Task, TaskSerializer } from "../../../worker/flow/Task"; import { ProofTaskSerializer } from "../../../helpers/utils"; @@ -31,6 +31,7 @@ export interface RuntimeProofParameters { @injectable() @scoped(Lifecycle.ContainerScoped) +@implement("Task") export class RuntimeProvingTask extends TaskWorkerModule implements Task diff --git a/packages/sequencer/src/protocol/production/tasks/StateTransitionReductionTask.ts b/packages/sequencer/src/protocol/production/tasks/StateTransitionReductionTask.ts index 940f1faaa..4a4c68dd3 100644 --- a/packages/sequencer/src/protocol/production/tasks/StateTransitionReductionTask.ts +++ b/packages/sequencer/src/protocol/production/tasks/StateTransitionReductionTask.ts @@ -9,6 +9,7 @@ import { import { CompileRegistry, ProvableMethodExecutionContext, + implement, } from "@proto-kit/common"; import { TaskWorkerModule } from "../../../worker/worker/TaskWorkerModule"; @@ -21,6 +22,7 @@ import { @injectable() @scoped(Lifecycle.ContainerScoped) +@implement("Task") export class StateTransitionReductionTask extends TaskWorkerModule implements Task, StateTransitionProof> diff --git a/packages/sequencer/src/protocol/production/tasks/StateTransitionTask.ts b/packages/sequencer/src/protocol/production/tasks/StateTransitionTask.ts index 532a0d5d4..acb625525 100644 --- a/packages/sequencer/src/protocol/production/tasks/StateTransitionTask.ts +++ b/packages/sequencer/src/protocol/production/tasks/StateTransitionTask.ts @@ -14,6 +14,7 @@ import { ProvableMethodExecutionContext, CompileRegistry, LinkedMerkleTreeWitness, + implement, } from "@proto-kit/common"; import { Task, TaskSerializer } from "../../../worker/flow/Task"; @@ -31,6 +32,7 @@ export interface StateTransitionProofParameters { @injectable() @scoped(Lifecycle.ContainerScoped) +@implement("Task") export class StateTransitionTask extends TaskWorkerModule implements Task diff --git a/packages/sequencer/src/protocol/production/tasks/TransactionProvingTask.ts b/packages/sequencer/src/protocol/production/tasks/TransactionProvingTask.ts index 5f6896b99..1a57f5ca5 100644 --- a/packages/sequencer/src/protocol/production/tasks/TransactionProvingTask.ts +++ b/packages/sequencer/src/protocol/production/tasks/TransactionProvingTask.ts @@ -13,6 +13,7 @@ import { inject, injectable, Lifecycle, scoped } from "tsyringe"; import { ProvableMethodExecutionContext, CompileRegistry, + implement, } from "@proto-kit/common"; import { ProofTaskSerializer } from "../../../helpers/utils"; @@ -49,6 +50,7 @@ export async function executeWithPrefilledStateService( @injectable() @scoped(Lifecycle.ContainerScoped) +@implement("Task") export class TransactionProvingTask extends TaskWorkerModule implements Task diff --git a/packages/sequencer/src/protocol/production/tasks/TransactionReductionTask.ts b/packages/sequencer/src/protocol/production/tasks/TransactionReductionTask.ts index 46e5ddac9..6ef865ffc 100644 --- a/packages/sequencer/src/protocol/production/tasks/TransactionReductionTask.ts +++ b/packages/sequencer/src/protocol/production/tasks/TransactionReductionTask.ts @@ -9,6 +9,7 @@ import { import { CompileRegistry, ProvableMethodExecutionContext, + implement, } from "@proto-kit/common"; import { TaskWorkerModule } from "../../../worker/worker/TaskWorkerModule"; @@ -21,6 +22,7 @@ import { @injectable() @scoped(Lifecycle.ContainerScoped) +@implement("Task") export class TransactionReductionTask extends TaskWorkerModule implements Task, TransactionProof> diff --git a/packages/sequencer/src/sequencer/SequencerStartupModule.ts b/packages/sequencer/src/sequencer/SequencerStartupModule.ts index 603835f7a..73db83b6d 100644 --- a/packages/sequencer/src/sequencer/SequencerStartupModule.ts +++ b/packages/sequencer/src/sequencer/SequencerStartupModule.ts @@ -12,6 +12,8 @@ import { ChildVerificationKeyService, CompileRegistry, AreProofsEnabled, + dependencyFactory, + DependencyRecord, } from "@proto-kit/common"; import { Flow, FlowCreator } from "../worker/flow/Flow"; @@ -23,12 +25,14 @@ import { import { VerificationKeyService } from "../protocol/runtime/RuntimeVerificationKeyService"; import type { MinaBaseLayer } from "../protocol/baselayer/MinaBaseLayer"; import { NoopBaseLayer } from "../protocol/baselayer/NoopBaseLayer"; +import { WorkerRegistrationTask } from "../worker/worker/startup/WorkerRegistrationTask"; import { SequencerModule, sequencerModule } from "./builder/SequencerModule"; import { Closeable, closeable } from "./builder/Closeable"; @sequencerModule() @closeable() +@dependencyFactory() export class SequencerStartupModule extends SequencerModule implements Closeable @@ -50,6 +54,17 @@ export class SequencerStartupModule super(); } + public static dependencies(): DependencyRecord { + return { + compileTask: { + useClass: CircuitCompilerTask, + }, + workerRegistrationTask: { + useClass: WorkerRegistrationTask, + }, + }; + } + private async pushCompileTask( flow: Flow<{}>, payload: CompilerTaskParams diff --git a/packages/sequencer/src/settlement/SettlementModule.ts b/packages/sequencer/src/settlement/SettlementModule.ts index b5741080c..0b40020e7 100644 --- a/packages/sequencer/src/settlement/SettlementModule.ts +++ b/packages/sequencer/src/settlement/SettlementModule.ts @@ -38,6 +38,7 @@ import { AddressRegistry, InMemoryAddressRegistry, } from "./interactions/AddressRegistry"; +import { SettlementProvingTask } from "./tasks/SettlementProvingTask"; export type SettlementModuleConfig = { addresses?: { @@ -83,6 +84,9 @@ export class SettlementModule AddressRegistry: { useClass: InMemoryAddressRegistry, }, + settlementProvingTask: { + useClass: SettlementProvingTask, + }, }; } diff --git a/packages/sequencer/src/settlement/tasks/SettlementProvingTask.ts b/packages/sequencer/src/settlement/tasks/SettlementProvingTask.ts index c21d08d05..e138e8e84 100644 --- a/packages/sequencer/src/settlement/tasks/SettlementProvingTask.ts +++ b/packages/sequencer/src/settlement/tasks/SettlementProvingTask.ts @@ -33,6 +33,7 @@ import { fetchLastBlock, } from "o1js"; import { inject, injectable, Lifecycle, scoped } from "tsyringe"; +import { implement } from "@proto-kit/common"; import { ProofTaskSerializer, @@ -74,6 +75,7 @@ export class SomeProofSubclass extends Proof { */ @injectable() @scoped(Lifecycle.ContainerScoped) +@implement("Task") export class SettlementProvingTask extends TaskWorkerModule implements Task diff --git a/packages/sequencer/src/worker/worker/LocalTaskWorkerModule.ts b/packages/sequencer/src/worker/worker/LocalTaskWorkerModule.ts index 85fa05663..de7b3521d 100644 --- a/packages/sequencer/src/worker/worker/LocalTaskWorkerModule.ts +++ b/packages/sequencer/src/worker/worker/LocalTaskWorkerModule.ts @@ -64,48 +64,69 @@ export class LocalTaskWorkerModule public containerEvents = new EventEmitter(); - private worker?: FlowTaskWorker< - InstanceType[StringKeyOf]>[] - > = undefined; + private worker?: FlowTaskWorker = undefined; public static from( modules: Tasks ): TypedClass> { return class ScopedTaskWorkerModule extends LocalTaskWorkerModule { public constructor() { - super(modules); + super(); + this.definition = modules; + + const config = Object.keys(modules).reduce>( + (acc, moduleName) => { + this.assertIsValidModuleName(moduleName); + acc[moduleName] = {}; + return acc; + }, + {} + ); + // eslint-disable-next-line @typescript-eslint/consistent-type-assertions + this.currentConfig = config as ModulesConfig; } }; } - public constructor(modules: Tasks) { - super(modules); - - // Since we disabled configs for tasks, we initialize the config as empty here - const config = Object.keys(modules).reduce>( - (acc, moduleName) => { - this.assertIsValidModuleName(moduleName); - acc[moduleName] = {}; - return acc; - }, - {} - ); + public constructor() { // eslint-disable-next-line @typescript-eslint/consistent-type-assertions - this.currentConfig = config as ModulesConfig; + super({} as Tasks); } private taskQueue() { return this.container.resolve("TaskQueue"); } - public async start(): Promise { - const tasks = this.moduleNames.map((moduleName) => { - this.assertIsValidModuleName(moduleName); + private resolveTasks() { + if (this.moduleNames.length > 0) { + return this.moduleNames.map((moduleName) => { + this.assertIsValidModuleName(moduleName); + const task = this.resolve(moduleName); + log.debug(`Resolved task ${task.name}`); + return task; + }); + } + if (this.container.isRegistered("Task", true)) { + const injectedTasks = this.container.resolveAll< + TaskWorkerModule & Task + >("Task"); + + const tasksSet = new Set(); + return injectedTasks.filter((task) => { + if (tasksSet.has(task.name)) { + return false; + } + tasksSet.add(task.name); + return true; + }); + } - const task = this.resolve(moduleName); - log.debug(`Resolved task ${task.name}`); - return task; - }); + log.warn("No tasks found"); + return []; + } + + public async start(): Promise { + const tasks = this.resolveTasks(); const worker = new FlowTaskWorker(this.taskQueue(), [...tasks]); this.worker = worker; diff --git a/packages/sequencer/src/worker/worker/startup/WorkerRegistrationTask.ts b/packages/sequencer/src/worker/worker/startup/WorkerRegistrationTask.ts index 3d4474717..c1ef63f3a 100644 --- a/packages/sequencer/src/worker/worker/startup/WorkerRegistrationTask.ts +++ b/packages/sequencer/src/worker/worker/startup/WorkerRegistrationTask.ts @@ -5,6 +5,7 @@ import { CompileRegistry, safeParseJson, ModuleContainerLike, + implement, } from "@proto-kit/common"; import { inject, injectable } from "tsyringe"; import { @@ -39,6 +40,7 @@ export type WorkerStartupPayload = { }; @injectable() +@implement("Task") export class WorkerRegistrationTask extends AbstractStartupTask implements Task diff --git a/packages/stack/src/presets/modules/index.ts b/packages/stack/src/presets/modules/index.ts index 68f3bb86f..120057e88 100644 --- a/packages/stack/src/presets/modules/index.ts +++ b/packages/stack/src/presets/modules/index.ts @@ -176,9 +176,7 @@ export class DefaultModules { static remoteWorker() { return { TaskQueue: BullQueue, - LocalTaskWorkerModule: LocalTaskWorkerModule.from( - VanillaTaskWorkerModules.allTasks() - ), + LocalTaskWorkerModule: LocalTaskWorkerModule, } satisfies SequencerModulesRecord; }