diff --git a/barretenberg/cpp/src/barretenberg/crypto/merkle_tree/append_only_tree/content_addressed_append_only_tree.hpp b/barretenberg/cpp/src/barretenberg/crypto/merkle_tree/append_only_tree/content_addressed_append_only_tree.hpp index a3e101135878..465ae0828bbc 100644 --- a/barretenberg/cpp/src/barretenberg/crypto/merkle_tree/append_only_tree/content_addressed_append_only_tree.hpp +++ b/barretenberg/cpp/src/barretenberg/crypto/merkle_tree/append_only_tree/content_addressed_append_only_tree.hpp @@ -75,6 +75,17 @@ template class ContentAddressedAppendOn ContentAddressedAppendOnlyTree& operator=(ContentAddressedAppendOnlyTree const&& other) = delete; virtual ~ContentAddressedAppendOnlyTree() = default; + void clear_initialized_from_block() { store_->clear_initialized_from_block(); } + + void sync_pruning_meta(const TreeMeta& canonical) + { + TreeMeta meta; + store_->get_meta(meta); + meta.oldestHistoricBlock = canonical.oldestHistoricBlock; + meta.finalizedBlockHeight = canonical.finalizedBlockHeight; + store_->put_meta(meta); + } + /** * @brief Adds a single value to the end of the tree * @param value The value to be added diff --git a/barretenberg/cpp/src/barretenberg/crypto/merkle_tree/node_store/cached_content_addressed_tree_store.hpp b/barretenberg/cpp/src/barretenberg/crypto/merkle_tree/node_store/cached_content_addressed_tree_store.hpp index 49a4dc1110cd..7fe848465dec 100644 --- a/barretenberg/cpp/src/barretenberg/crypto/merkle_tree/node_store/cached_content_addressed_tree_store.hpp +++ b/barretenberg/cpp/src/barretenberg/crypto/merkle_tree/node_store/cached_content_addressed_tree_store.hpp @@ -64,6 +64,8 @@ template class ContentAddressedCachedTreeStore { ContentAddressedCachedTreeStore& operator=(ContentAddressedCachedTreeStore const& other) = delete; ContentAddressedCachedTreeStore& operator=(ContentAddressedCachedTreeStore const&& other) = delete; + void clear_initialized_from_block() { forkConstantData_.initialized_from_block_.reset(); } + /** * @brief Returns the index of the leaf with a value immediately lower than the value provided */ diff --git a/barretenberg/cpp/src/barretenberg/nodejs_module/world_state/world_state.cpp b/barretenberg/cpp/src/barretenberg/nodejs_module/world_state/world_state.cpp index 9087aceeb19e..4a41cbe9f4ce 100644 --- a/barretenberg/cpp/src/barretenberg/nodejs_module/world_state/world_state.cpp +++ b/barretenberg/cpp/src/barretenberg/nodejs_module/world_state/world_state.cpp @@ -233,6 +233,10 @@ WorldStateWrapper::WorldStateWrapper(const Napi::CallbackInfo& info) WorldStateMessageType::DELETE_FORK, [this](msgpack::object& obj, msgpack::sbuffer& buffer) { return delete_fork(obj, buffer); }); + _dispatcher.register_target( + WorldStateMessageType::COMMIT_FORK, + [this](msgpack::object& obj, msgpack::sbuffer& buffer) { return commit_fork(obj, buffer); }); + _dispatcher.register_target( WorldStateMessageType::FINALIZE_BLOCKS, [this](msgpack::object& obj, msgpack::sbuffer& buffer) { return set_finalized(obj, buffer); }); @@ -787,6 +791,20 @@ bool WorldStateWrapper::delete_fork(msgpack::object& obj, msgpack::sbuffer& buf) return true; } +bool WorldStateWrapper::commit_fork(msgpack::object& obj, msgpack::sbuffer& buf) +{ + TypedMessage request; + obj.convert(request); + + WorldStateStatusFull status = _ws->commit_fork(request.value.forkId); + + MsgHeader header(request.header.messageId); + messaging::TypedMessage resp_msg(WorldStateMessageType::COMMIT_FORK, header, { status }); + msgpack::pack(buf, resp_msg); + + return true; +} + bool WorldStateWrapper::close(msgpack::object& obj, msgpack::sbuffer& buf) { HeaderOnlyMessage request; diff --git a/barretenberg/cpp/src/barretenberg/nodejs_module/world_state/world_state.hpp b/barretenberg/cpp/src/barretenberg/nodejs_module/world_state/world_state.hpp index cd4f0d02e8e1..f9e523a8da8a 100644 --- a/barretenberg/cpp/src/barretenberg/nodejs_module/world_state/world_state.hpp +++ b/barretenberg/cpp/src/barretenberg/nodejs_module/world_state/world_state.hpp @@ -63,6 +63,7 @@ class WorldStateWrapper : public Napi::ObjectWrap { bool create_fork(msgpack::object& obj, msgpack::sbuffer& buffer); bool delete_fork(msgpack::object& obj, msgpack::sbuffer& buffer); + bool commit_fork(msgpack::object& obj, msgpack::sbuffer& buffer); bool close(msgpack::object& obj, msgpack::sbuffer& buffer); diff --git a/barretenberg/cpp/src/barretenberg/nodejs_module/world_state/world_state_message.hpp b/barretenberg/cpp/src/barretenberg/nodejs_module/world_state/world_state_message.hpp index 8f6b481ad41a..406b72e14cad 100644 --- a/barretenberg/cpp/src/barretenberg/nodejs_module/world_state/world_state_message.hpp +++ b/barretenberg/cpp/src/barretenberg/nodejs_module/world_state/world_state_message.hpp @@ -44,6 +44,7 @@ enum WorldStateMessageType { CREATE_FORK, DELETE_FORK, + COMMIT_FORK, FINALIZE_BLOCKS, UNWIND_BLOCKS, diff --git a/barretenberg/cpp/src/barretenberg/world_state/world_state.cpp b/barretenberg/cpp/src/barretenberg/world_state/world_state.cpp index f221e93fcf6f..c7e0ec4a04fb 100644 --- a/barretenberg/cpp/src/barretenberg/world_state/world_state.cpp +++ b/barretenberg/cpp/src/barretenberg/world_state/world_state.cpp @@ -192,6 +192,19 @@ Fork::SharedPtr WorldState::retrieve_fork(const uint64_t& forkId) const } return it->second; } + +Fork::SharedPtr WorldState::retrieve_and_remove_fork(const uint64_t& forkId) +{ + std::unique_lock lock(mtx); + auto it = _forks.find(forkId); + if (it == _forks.end()) { + throw std::runtime_error("Fork not found"); + } + Fork::SharedPtr fork = it->second; + _forks.erase(it); + return fork; +} + uint64_t WorldState::create_fork(const std::optional& blockNumber) { block_number_t blockNumberForFork = 0; @@ -234,13 +247,62 @@ void WorldState::delete_fork(const uint64_t& forkId) if (forkId == 0) { throw std::runtime_error("Unable to delete canonical fork"); } - // Retrieving the shared pointer here means we throw if the fork is not available, it also means we are not under a - // lock when we destroy the object - Fork::SharedPtr fork = retrieve_fork(forkId); - { - std::unique_lock lock(mtx); - _forks.erase(forkId); + // Atomically retrieve and remove so no concurrent caller can obtain a reference. + // The local shared_ptr ensures the fork is destroyed outside the lock. + Fork::SharedPtr fork = retrieve_and_remove_fork(forkId); +} + +WorldStateStatusFull WorldState::commit_fork(const uint64_t& forkId) +{ + if (forkId == CANONICAL_FORK_ID) { + throw std::runtime_error("Cannot commit the canonical fork"); } + validate_trees_are_equally_synched(); + + // Atomically retrieve and remove so no concurrent caller can obtain a reference. + // The local shared_ptr keeps the fork alive for the duration of this method. + Fork::SharedPtr fork = retrieve_and_remove_fork(forkId); + + // Validate tip hasn't moved since fork was created + auto archiveMeta = get_tree_info(WorldStateRevision::committed(), MerkleTreeId::ARCHIVE); + if (archiveMeta.meta.unfinalizedBlockHeight != fork->_blockNumber) { + throw std::runtime_error("Can't commit fork: canonical tip has moved from " + + std::to_string(fork->_blockNumber) + " to " + + std::to_string(archiveMeta.meta.unfinalizedBlockHeight)); + } + + // Rollback canonical to clear any uncommitted state + rollback(); + + // Save pruning-related meta from canonical before the fork overwrites it. + // The fork's cached meta may have stale oldestHistoricBlock/finalizedBlockHeight + // from when it was created, so commit_block would overwrite LMDB with stale values. + std::array canonicalMeta; + get_all_tree_info(WorldStateRevision::committed(), canonicalMeta); + + // Clear fork flags so commit_block() is allowed on fork stores + for (auto& [id, tree] : fork->_trees) { + std::visit([](auto&& wrapper) { wrapper.tree->clear_initialized_from_block(); }, tree); + } + + // Sync the fork's cached meta with canonical pruning state before committing. + // The fork's cached meta may have stale oldestHistoricBlock/finalizedBlockHeight. + for (auto& entry : fork->_trees) { + std::visit([&](auto&& wrapper) { wrapper.tree->sync_pruning_meta(canonicalMeta[entry.first]); }, entry.second); + } + + // Commit fork trees to LMDB + WorldStateStatusFull status; + auto [success, message] = commit(fork, status); + if (!success) { + throw std::runtime_error("Failed to commit fork: " + message); + } + + // Rollback canonical so it re-reads the updated LMDB state + rollback(); + + populate_status_summary(status); + return status; } Fork::SharedPtr WorldState::create_new_fork(const block_number_t& blockNumber) @@ -520,9 +582,13 @@ void WorldState::update_archive(const StateReference& block_state_ref, } std::pair WorldState::commit(WorldStateStatusFull& status) +{ + return commit(retrieve_fork(CANONICAL_FORK_ID), status); +} + +std::pair WorldState::commit(Fork::SharedPtr fork, WorldStateStatusFull& status) { // NOTE: the calling code is expected to ensure no other reads or writes happen during commit - Fork::SharedPtr fork = retrieve_fork(CANONICAL_FORK_ID); std::atomic_bool success = true; std::string message; Signal signal(static_cast(fork->_trees.size())); diff --git a/barretenberg/cpp/src/barretenberg/world_state/world_state.hpp b/barretenberg/cpp/src/barretenberg/world_state/world_state.hpp index d7d8f99d46f5..b3bd9a5bbe7c 100644 --- a/barretenberg/cpp/src/barretenberg/world_state/world_state.hpp +++ b/barretenberg/cpp/src/barretenberg/world_state/world_state.hpp @@ -274,6 +274,7 @@ class WorldState { uint64_t create_fork(const std::optional& blockNumber); void delete_fork(const uint64_t& forkId); + WorldStateStatusFull commit_fork(const uint64_t& forkId); WorldStateStatusSummary set_finalized_blocks(const block_number_t& toBlockNumber); WorldStateStatusFull unwind_blocks(const block_number_t& toBlockNumber); @@ -311,6 +312,7 @@ class WorldState { uint64_t maxReaders); Fork::SharedPtr retrieve_fork(const uint64_t& forkId) const; + Fork::SharedPtr retrieve_and_remove_fork(const uint64_t& forkId); Fork::SharedPtr create_new_fork(const block_number_t& blockNumber); void remove_forks_for_block(const block_number_t& blockNumber); @@ -343,6 +345,8 @@ class WorldState { static void populate_status_summary(WorldStateStatusFull& status); + std::pair commit(Fork::SharedPtr fork, WorldStateStatusFull& status); + template void commit_tree(TreeDBStats& dbStats, Signal& signal, diff --git a/barretenberg/cpp/src/barretenberg/world_state/world_state.test.cpp b/barretenberg/cpp/src/barretenberg/world_state/world_state.test.cpp index e28a3d4d9549..15bdd036fb17 100644 --- a/barretenberg/cpp/src/barretenberg/world_state/world_state.test.cpp +++ b/barretenberg/cpp/src/barretenberg/world_state/world_state.test.cpp @@ -896,6 +896,219 @@ TEST_F(WorldStateTest, BuildsABlockInAFork) EXPECT_EQ(fork_state_ref, ws.get_state_reference(WorldStateRevision::committed())); } +TEST_F(WorldStateTest, CommitForkHappyPath) +{ + WorldState ws(thread_pool_size, data_dir, map_size, tree_heights, tree_prefill, initial_header_generator_point); + auto fork_id = ws.create_fork(0); + + // Build a block on the fork (same pattern as BuildsABlockInAFork) + ws.append_leaves(MerkleTreeId::NOTE_HASH_TREE, { 42 }, fork_id); + ws.append_leaves(MerkleTreeId::L1_TO_L2_MESSAGE_TREE, { 43 }, fork_id); + ws.batch_insert_indexed_leaves(MerkleTreeId::NULLIFIER_TREE, { { 129 } }, 0, fork_id); + ws.batch_insert_indexed_leaves(MerkleTreeId::PUBLIC_DATA_TREE, { { 129, 1 } }, 0, fork_id); + + auto fork_state_ref = ws.get_state_reference(WorldStateRevision{ .forkId = fork_id, .includeUncommitted = true }); + ws.update_archive(fork_state_ref, { 1 }, fork_id); + + // Commit the fork + WorldStateStatusFull status = ws.commit_fork(fork_id); + EXPECT_EQ(status.summary.unfinalizedBlockNumber, 1); + + // Verify canonical committed state has the new leaves + assert_leaf_value(ws, WorldStateRevision::committed(), MerkleTreeId::NOTE_HASH_TREE, 0, fr(42)); + assert_leaf_value(ws, WorldStateRevision::committed(), MerkleTreeId::L1_TO_L2_MESSAGE_TREE, 0, fr(43)); + assert_leaf_value(ws, WorldStateRevision::committed(), MerkleTreeId::NULLIFIER_TREE, 128, NullifierLeafValue(129)); + assert_leaf_value( + ws, WorldStateRevision::committed(), MerkleTreeId::PUBLIC_DATA_TREE, 128, PublicDataLeafValue(129, 1)); + + // Verify state reference matches + EXPECT_EQ(fork_state_ref, ws.get_state_reference(WorldStateRevision::committed())); +} + +TEST_F(WorldStateTest, CommitForkThenCreateNewForkAtAdvancedTip) +{ + WorldState ws(thread_pool_size, data_dir, map_size, tree_heights, tree_prefill, initial_header_generator_point); + + // Build and commit block 1 + auto fork1 = ws.create_fork(0); + ws.append_leaves(MerkleTreeId::NOTE_HASH_TREE, { 42 }, fork1); + ws.append_leaves(MerkleTreeId::L1_TO_L2_MESSAGE_TREE, { 43 }, fork1); + ws.batch_insert_indexed_leaves(MerkleTreeId::NULLIFIER_TREE, { { 129 } }, 0, fork1); + ws.batch_insert_indexed_leaves(MerkleTreeId::PUBLIC_DATA_TREE, { { 129, 1 } }, 0, fork1); + auto state_ref1 = ws.get_state_reference(WorldStateRevision{ .forkId = fork1, .includeUncommitted = true }); + ws.update_archive(state_ref1, { 1 }, fork1); + WorldStateStatusFull status1 = ws.commit_fork(fork1); + EXPECT_EQ(status1.summary.unfinalizedBlockNumber, 1); + + // Create new fork at latest — should be at block 1 + auto fork2 = ws.create_fork(std::nullopt); + + // Build and commit block 2 + ws.append_leaves(MerkleTreeId::NOTE_HASH_TREE, { 44 }, fork2); + ws.append_leaves(MerkleTreeId::L1_TO_L2_MESSAGE_TREE, { 45 }, fork2); + ws.batch_insert_indexed_leaves(MerkleTreeId::NULLIFIER_TREE, { { 130 } }, 0, fork2); + ws.batch_insert_indexed_leaves(MerkleTreeId::PUBLIC_DATA_TREE, { { 130, 2 } }, 0, fork2); + auto state_ref2 = ws.get_state_reference(WorldStateRevision{ .forkId = fork2, .includeUncommitted = true }); + ws.update_archive(state_ref2, { 2 }, fork2); + WorldStateStatusFull status2 = ws.commit_fork(fork2); + EXPECT_EQ(status2.summary.unfinalizedBlockNumber, 2); + + // Verify both blocks' leaves are visible in canonical + assert_leaf_value(ws, WorldStateRevision::committed(), MerkleTreeId::NOTE_HASH_TREE, 0, fr(42)); + assert_leaf_value(ws, WorldStateRevision::committed(), MerkleTreeId::NOTE_HASH_TREE, 1, fr(44)); +} + +TEST_F(WorldStateTest, CommitForkRejectsWhenTipMoved) +{ + WorldState ws(thread_pool_size, data_dir, map_size, tree_heights, tree_prefill, initial_header_generator_point); + + // Build block 1 using a temporary fork to get correct state references + auto tmp_fork = ws.create_fork(0); + ws.append_leaves(MerkleTreeId::NOTE_HASH_TREE, { 42 }, tmp_fork); + ws.append_leaves(MerkleTreeId::L1_TO_L2_MESSAGE_TREE, { 43 }, tmp_fork); + ws.batch_insert_indexed_leaves(MerkleTreeId::NULLIFIER_TREE, { { 129 } }, 0, tmp_fork); + ws.batch_insert_indexed_leaves(MerkleTreeId::PUBLIC_DATA_TREE, { { 129, 1 } }, 0, tmp_fork); + auto state_ref1 = ws.get_state_reference(WorldStateRevision{ .forkId = tmp_fork, .includeUncommitted = true }); + ws.delete_fork(tmp_fork); + ws.sync_block(state_ref1, fr(1), { 42 }, { 43 }, { NullifierLeafValue(129) }, { { PublicDataLeafValue(129, 1) } }); + + // Create fork at block 1 — this is the fork we'll try to commit later + auto fork_id = ws.create_fork(1); + + // Build and sync block 2 to advance canonical tip + auto tmp_fork2 = ws.create_fork(1); + ws.append_leaves(MerkleTreeId::NOTE_HASH_TREE, { 44 }, tmp_fork2); + ws.append_leaves(MerkleTreeId::L1_TO_L2_MESSAGE_TREE, { 45 }, tmp_fork2); + ws.batch_insert_indexed_leaves(MerkleTreeId::NULLIFIER_TREE, { { 130 } }, 0, tmp_fork2); + ws.batch_insert_indexed_leaves(MerkleTreeId::PUBLIC_DATA_TREE, { { 130, 2 } }, 0, tmp_fork2); + auto state_ref2 = ws.get_state_reference(WorldStateRevision{ .forkId = tmp_fork2, .includeUncommitted = true }); + ws.delete_fork(tmp_fork2); + ws.sync_block(state_ref2, fr(2), { 44 }, { 45 }, { NullifierLeafValue(130) }, { { PublicDataLeafValue(130, 2) } }); + + // commit_fork should reject because tip moved from 1 to 2 + EXPECT_THROW(ws.commit_fork(fork_id), std::runtime_error); + + // Verify canonical is still at block 2, not corrupted + auto archive_info = ws.get_tree_info(WorldStateRevision::committed(), MerkleTreeId::ARCHIVE); + EXPECT_EQ(archive_info.meta.unfinalizedBlockHeight, 2); +} + +TEST_F(WorldStateTest, CommitForkRejectsCanonicalForkId) +{ + WorldState ws(thread_pool_size, data_dir, map_size, tree_heights, tree_prefill, initial_header_generator_point); + EXPECT_THROW(ws.commit_fork(CANONICAL_FORK_ID), std::runtime_error); +} + +TEST_F(WorldStateTest, CommitForkRejectsInvalidForkId) +{ + WorldState ws(thread_pool_size, data_dir, map_size, tree_heights, tree_prefill, initial_header_generator_point); + EXPECT_THROW(ws.commit_fork(99999), std::runtime_error); +} + +TEST_F(WorldStateTest, CommitForkCanonicalReadsReflectData) +{ + WorldState ws(thread_pool_size, data_dir, map_size, tree_heights, tree_prefill, initial_header_generator_point); + auto fork_id = ws.create_fork(0); + + ws.append_leaves(MerkleTreeId::NOTE_HASH_TREE, { 42 }, fork_id); + ws.append_leaves(MerkleTreeId::L1_TO_L2_MESSAGE_TREE, { 43 }, fork_id); + ws.batch_insert_indexed_leaves(MerkleTreeId::NULLIFIER_TREE, { { 129 } }, 0, fork_id); + ws.batch_insert_indexed_leaves(MerkleTreeId::PUBLIC_DATA_TREE, { { 129, 1 } }, 0, fork_id); + auto fork_state_ref = ws.get_state_reference(WorldStateRevision{ .forkId = fork_id, .includeUncommitted = true }); + ws.update_archive(fork_state_ref, { 1 }, fork_id); + + ws.commit_fork(fork_id); + + // Verify leaves are searchable from canonical + assert_leaf_exists(ws, WorldStateRevision::committed(), MerkleTreeId::NOTE_HASH_TREE, fr(42), true); + assert_leaf_exists(ws, WorldStateRevision::committed(), MerkleTreeId::L1_TO_L2_MESSAGE_TREE, fr(43), true); + assert_leaf_exists( + ws, WorldStateRevision::committed(), MerkleTreeId::NULLIFIER_TREE, NullifierLeafValue(129), true); + assert_leaf_exists( + ws, WorldStateRevision::committed(), MerkleTreeId::PUBLIC_DATA_TREE, PublicDataLeafValue(129, 1), true); + + // Verify tree sizes advanced + assert_tree_size(ws, WorldStateRevision::committed(), MerkleTreeId::NOTE_HASH_TREE, 1); + assert_tree_size(ws, WorldStateRevision::committed(), MerkleTreeId::L1_TO_L2_MESSAGE_TREE, 1); + assert_tree_size(ws, WorldStateRevision::committed(), MerkleTreeId::NULLIFIER_TREE, 129); + assert_tree_size(ws, WorldStateRevision::committed(), MerkleTreeId::PUBLIC_DATA_TREE, 129); +} + +TEST_F(WorldStateTest, CommitForkDestroysFork) +{ + WorldState ws(thread_pool_size, data_dir, map_size, tree_heights, tree_prefill, initial_header_generator_point); + auto fork_id = ws.create_fork(0); + + ws.append_leaves(MerkleTreeId::NOTE_HASH_TREE, { 42 }, fork_id); + ws.append_leaves(MerkleTreeId::L1_TO_L2_MESSAGE_TREE, { 43 }, fork_id); + ws.batch_insert_indexed_leaves(MerkleTreeId::NULLIFIER_TREE, { { 129 } }, 0, fork_id); + ws.batch_insert_indexed_leaves(MerkleTreeId::PUBLIC_DATA_TREE, { { 129, 1 } }, 0, fork_id); + auto fork_state_ref = ws.get_state_reference(WorldStateRevision{ .forkId = fork_id, .includeUncommitted = true }); + ws.update_archive(fork_state_ref, { 1 }, fork_id); + + ws.commit_fork(fork_id); + + // Fork should be destroyed — operations on it should fail + EXPECT_THROW(ws.delete_fork(fork_id), std::runtime_error); + EXPECT_THROW(ws.append_leaves(MerkleTreeId::NOTE_HASH_TREE, { 99 }, fork_id), std::runtime_error); +} + +TEST_F(WorldStateTest, CommitForkDoesNotRollBackOldestHistoricBlock) +{ + // Reproduces: fork is created, then blocks are pruned on canonical BEFORE commit_fork. + // Without the fix, commit_fork overwrites oldestHistoricBlock with the stale fork value, + // causing subsequent remove_historical_blocks to fail with "Failed to read block data". + WorldState ws(thread_pool_size, data_dir, map_size, tree_heights, tree_prefill, initial_header_generator_point); + + // Sync blocks 1..4 via sync_block + for (uint32_t i = 1; i <= 4; i++) { + auto tmp = ws.create_fork(i - 1); + ws.append_leaves(MerkleTreeId::NOTE_HASH_TREE, { fr(i * 10) }, tmp); + ws.append_leaves(MerkleTreeId::L1_TO_L2_MESSAGE_TREE, { fr(i * 10 + 1) }, tmp); + ws.batch_insert_indexed_leaves(MerkleTreeId::NULLIFIER_TREE, { { 128 + i } }, 0, tmp); + ws.batch_insert_indexed_leaves(MerkleTreeId::PUBLIC_DATA_TREE, { { 128 + i, i } }, 0, tmp); + auto sr = ws.get_state_reference(WorldStateRevision{ .forkId = tmp, .includeUncommitted = true }); + ws.delete_fork(tmp); + ws.sync_block(sr, + fr(i), + { fr(i * 10) }, + { fr(i * 10 + 1) }, + { NullifierLeafValue(128 + i) }, + { { PublicDataLeafValue(128 + i, i) } }); + } + + // Finalize block 2 so we can prune + ws.set_finalized_blocks(2); + + // Create fork at block 4 BEFORE pruning — fork captures oldestHistoricBlock = 1 + auto fork_id = ws.create_fork(4); + ws.append_leaves(MerkleTreeId::NOTE_HASH_TREE, { fr(50) }, fork_id); + ws.append_leaves(MerkleTreeId::L1_TO_L2_MESSAGE_TREE, { fr(51) }, fork_id); + ws.batch_insert_indexed_leaves(MerkleTreeId::NULLIFIER_TREE, { { 133 } }, 0, fork_id); + ws.batch_insert_indexed_leaves(MerkleTreeId::PUBLIC_DATA_TREE, { { 133, 5 } }, 0, fork_id); + auto fork_sr = ws.get_state_reference(WorldStateRevision{ .forkId = fork_id, .includeUncommitted = true }); + ws.update_archive(fork_sr, { 5 }, fork_id); + + // Prune blocks 1..2 AFTER fork was created — canonical advances oldestHistoricBlock to 2 + ws.remove_historical_blocks(2); + auto info_after_prune = ws.get_tree_info(WorldStateRevision::committed(), MerkleTreeId::ARCHIVE); + EXPECT_EQ(info_after_prune.meta.oldestHistoricBlock, 2); + + // commit_fork — must NOT roll back oldestHistoricBlock from 2 to 1 + ws.commit_fork(fork_id); + + auto info_after_commit = ws.get_tree_info(WorldStateRevision::committed(), MerkleTreeId::ARCHIVE); + EXPECT_EQ(info_after_commit.meta.unfinalizedBlockHeight, 5); + EXPECT_EQ(info_after_commit.meta.oldestHistoricBlock, 2); + + // Finalize block 4 and prune — should NOT throw "Failed to read block data" + ws.set_finalized_blocks(4); + EXPECT_NO_THROW(ws.remove_historical_blocks(4)); + + auto info_final = ws.get_tree_info(WorldStateRevision::committed(), MerkleTreeId::ARCHIVE); + EXPECT_EQ(info_final.meta.oldestHistoricBlock, 4); +} + TEST_F(WorldStateTest, GetBlockForIndex) { WorldState ws(thread_pool_size, data_dir, map_size, tree_heights, tree_prefill, initial_header_generator_point); diff --git a/yarn-project/prover-client/src/light/lightweight_checkpoint_builder.ts b/yarn-project/prover-client/src/light/lightweight_checkpoint_builder.ts index 1e72a75d7684..cdb0a47dc5e5 100644 --- a/yarn-project/prover-client/src/light/lightweight_checkpoint_builder.ts +++ b/yarn-project/prover-client/src/light/lightweight_checkpoint_builder.ts @@ -42,13 +42,18 @@ export class LightweightCheckpointBuilder { private blocks: L2Block[] = []; private blobFields: Fr[] = []; + /** Replaces the database used for subsequent block builds. */ + setDb(db: MerkleTreeWriteOperations): void { + this.db = db; + } + constructor( public readonly checkpointNumber: CheckpointNumber, public readonly constants: CheckpointGlobalVariables, public feeAssetPriceModifier: bigint, public readonly l1ToL2Messages: Fr[], private readonly previousCheckpointOutHashes: Fr[], - public readonly db: MerkleTreeWriteOperations, + private db: MerkleTreeWriteOperations, bindings?: LoggerBindings, ) { this.logger = createLogger('checkpoint-builder', { diff --git a/yarn-project/sequencer-client/src/sequencer/checkpoint_proposal_job.ts b/yarn-project/sequencer-client/src/sequencer/checkpoint_proposal_job.ts index 17703d4a016d..51792ae6b81f 100644 --- a/yarn-project/sequencer-client/src/sequencer/checkpoint_proposal_job.ts +++ b/yarn-project/sequencer-client/src/sequencer/checkpoint_proposal_job.ts @@ -264,6 +264,8 @@ export class CheckpointProposalJob implements Traceable { }; }) private async proposeCheckpoint(): Promise { + let fork: Awaited> | undefined; + let checkpointBuilder: CheckpointBuilder | undefined; try { // Get operator configured coinbase and fee recipient for this attestor const coinbase = this.validatorClient.getCoinbaseForAttestor(this.attestorAddress); @@ -318,11 +320,13 @@ export class CheckpointProposalJob implements Traceable { // Get the fee asset price modifier from the oracle const feeAssetPriceModifier = await this.publisher.getFeeAssetPriceModifier(); - // Create a long-lived forked world state for the checkpoint builder - await using fork = await this.worldState.fork(this.syncedToBlockNumber, { closeDelayMs: 12_000 }); + // Create a forked world state for the checkpoint builder. + // The fork is registered for each built block so SYNC_BLOCK can commit it. + // After each block, a new fork is created at the advanced tip. + fork = await this.worldState.fork(this.syncedToBlockNumber, { closeDelayMs: 12_000 }); // Create checkpoint builder for the entire slot - const checkpointBuilder = await this.checkpointsBuilder.startCheckpoint( + checkpointBuilder = await this.checkpointsBuilder.startCheckpoint( this.checkpointNumber, checkpointGlobalVariables, feeAssetPriceModifier, @@ -486,6 +490,13 @@ export class CheckpointProposalJob implements Traceable { this.log.error(`Error building checkpoint at slot ${this.targetSlot}`, err); return undefined; + } finally { + // Close forks to release native resources. Already-committed forks silently handle "Fork not found". + const currentFork = checkpointBuilder?.getFork(); + if (currentFork && currentFork !== fork) { + await currentFork.close(); + } + await fork?.close(); } } @@ -567,8 +578,11 @@ export class CheckpointProposalJob implements Traceable { blocksInCheckpoint.push(block); usedTxs.forEach(tx => txHashesAlreadyIncluded.add(tx.txHash.toString())); - // If this is the last block, sync it to the archiver and exit the loop - // so we can build the checkpoint and start collecting attestations. + // Register the fork so SYNC_BLOCK can commit it instead of recalculating. + this.worldState.registerForkForBlock(block.archive.root, checkpointBuilder.getForkId()); + + // If this is the last block, exit the loop so we can build the checkpoint and start collecting attestations. + // The block will be synced to LMDB when the block stream picks it up from the archiver. if (timingInfo.isLastBlock) { await this.syncProposedBlockToArchiver(block); this.log.verbose(`Completed final block ${blockNumber} for slot ${this.targetSlot}`, { @@ -587,13 +601,21 @@ export class CheckpointProposalJob implements Traceable { const proposal = await this.createBlockProposal(block, inHash, usedTxs, blockProposalOptions); // Sync the proposed block to the archiver to make it available, only after we've managed to sign the proposal. - // We wait for the sync to succeed, as this helps catch consistency errors, even if it means we lose some time for block-building. - // If this throws, we abort the entire checkpoint. await this.syncProposedBlockToArchiver(block); // Once we have a signed proposal and the archiver agreed with our proposed block, then we broadcast it. proposal && (await this.p2pClient.broadcastProposal(proposal)); + // Wait for LMDB to reach this block before creating the next fork. + // In HA mode, another peer's proposal may arrive via gossip and be committed instead of ours. + // If the block was not pushed to the archiver (e.g. skipPushProposedBlocksToArchiver), skip the + // sync and reuse the current fork for the next block. + if (!this.config.skipPushProposedBlocksToArchiver) { + await this.worldState.syncImmediate(blockNumber); + const newFork = await this.worldState.fork(undefined, { closeDelayMs: 12_000 }); + await checkpointBuilder.setFork(newFork); + } + // Wait until the next block's start time await this.waitUntilNextSubslot(timingInfo.deadline); } diff --git a/yarn-project/sequencer-client/src/test/mock_checkpoint_builder.ts b/yarn-project/sequencer-client/src/test/mock_checkpoint_builder.ts index f0a6afca82cc..84f54cbc204b 100644 --- a/yarn-project/sequencer-client/src/test/mock_checkpoint_builder.ts +++ b/yarn-project/sequencer-client/src/test/mock_checkpoint_builder.ts @@ -66,6 +66,18 @@ export class MockCheckpointBuilder implements ICheckpointBlockBuilder { return this; } + getFork(): MerkleTreeWriteOperations { + return { close: () => Promise.resolve() } as unknown as MerkleTreeWriteOperations; + } + + getForkId(): number { + return 0; + } + + async setFork(_fork: MerkleTreeWriteOperations): Promise { + // No-op in mock — the mock doesn't use the fork directly + } + getConstantData(): CheckpointGlobalVariables { return this.constants; } diff --git a/yarn-project/simulator/src/public/hinting_db_sources.ts b/yarn-project/simulator/src/public/hinting_db_sources.ts index 85f8ab422ccf..1cb2b75a0fb7 100644 --- a/yarn-project/simulator/src/public/hinting_db_sources.ts +++ b/yarn-project/simulator/src/public/hinting_db_sources.ts @@ -237,6 +237,10 @@ export class HintingMerkleWriteOperations implements MerkleTreeWriteOperations { } // Use create() to instantiate. + get forkId(): number { + return this.db.forkId; + } + private constructor( private db: MerkleTreeWriteOperations, private hints: AvmExecutionHints, diff --git a/yarn-project/simulator/src/public/public_processor/guarded_merkle_tree.ts b/yarn-project/simulator/src/public/public_processor/guarded_merkle_tree.ts index 71133c4a2ebf..b28c852b5926 100644 --- a/yarn-project/simulator/src/public/public_processor/guarded_merkle_tree.ts +++ b/yarn-project/simulator/src/public/public_processor/guarded_merkle_tree.ts @@ -25,6 +25,10 @@ export class GuardedMerkleTreeOperations implements MerkleTreeWriteOperations { private isStopped = false; private serialQueue = new SerialQueue(); + get forkId(): number { + return this.target.forkId; + } + constructor(private target: MerkleTreeWriteOperations) { this.serialQueue.start(); } diff --git a/yarn-project/stdlib/src/interfaces/block-builder.ts b/yarn-project/stdlib/src/interfaces/block-builder.ts index 6a2f49bb4209..0bfd6aa66487 100644 --- a/yarn-project/stdlib/src/interfaces/block-builder.ts +++ b/yarn-project/stdlib/src/interfaces/block-builder.ts @@ -135,6 +135,15 @@ export interface ICheckpointBlockBuilder { timestamp: bigint, opts: BlockBuilderOptions, ): Promise; + + /** Returns the current fork. */ + getFork(): MerkleTreeWriteOperations; + + /** Returns the native fork ID of the current fork. */ + getForkId(): number; + + /** Replaces the fork used for subsequent block builds, closing the previous one. */ + setFork(fork: MerkleTreeWriteOperations): Promise; } /** Interface for creating checkpoint builders. */ diff --git a/yarn-project/stdlib/src/interfaces/merkle_tree_operations.ts b/yarn-project/stdlib/src/interfaces/merkle_tree_operations.ts index 29625e9d4c43..7f1edc398f7c 100644 --- a/yarn-project/stdlib/src/interfaces/merkle_tree_operations.ts +++ b/yarn-project/stdlib/src/interfaces/merkle_tree_operations.ts @@ -245,6 +245,9 @@ export interface MerkleTreeWriteOperations extends MerkleTreeReadOperations, MerkleTreeCheckpointOperations, AsyncDisposable { + /** The native fork ID assigned by the world state. */ + readonly forkId: number; + /** * Appends leaves to a given tree. * @param treeId - The tree to be updated. diff --git a/yarn-project/stdlib/src/interfaces/world_state.ts b/yarn-project/stdlib/src/interfaces/world_state.ts index a99bcc1fa35f..564be5ad7c13 100644 --- a/yarn-project/stdlib/src/interfaces/world_state.ts +++ b/yarn-project/stdlib/src/interfaces/world_state.ts @@ -1,4 +1,5 @@ import { BlockNumber, BlockNumberSchema } from '@aztec/foundation/branded-types'; +import type { Fr } from '@aztec/foundation/curves/bn254'; import type { PromiseWithResolvers } from '@aztec/foundation/promise'; import { z } from 'zod'; @@ -51,6 +52,14 @@ export interface ForkMerkleTreeOperations { */ fork(block?: BlockNumber, opts?: { closeDelayMs?: number }): Promise; + /** + * Registers a fork that has built a block. When SYNC_BLOCK is later called for a block + * with the same archive root, the fork will be committed instead of recalculating from scratch. + * @param archiveRoot - The archive root of the block built on the fork. + * @param forkId - The native fork ID. + */ + registerForkForBlock(archiveRoot: Fr, forkId: number): void; + /** Backups the db to the target path. */ backupTo(dstPath: string, compact?: boolean): Promise, string>>; } diff --git a/yarn-project/txe/src/state_machine/synchronizer.ts b/yarn-project/txe/src/state_machine/synchronizer.ts index 8e217f4785d1..6298ca9cd8ca 100644 --- a/yarn-project/txe/src/state_machine/synchronizer.ts +++ b/yarn-project/txe/src/state_machine/synchronizer.ts @@ -47,6 +47,11 @@ export class TXESynchronizer implements WorldStateSynchronizer { return this.nativeWorldStateService.getCommitted(); } + /** Registers a fork for a block (no-op in TXE). */ + public registerForkForBlock(_archiveRoot: Fr, _forkId: number): void { + // No-op — TXE doesn't use the block stream pipeline + } + /** Forks the world state at the given block number, defaulting to the latest one. */ public fork(block?: number): Promise { return this.nativeWorldStateService.fork(block ? BlockNumber(block) : undefined); diff --git a/yarn-project/validator-client/src/checkpoint_builder.ts b/yarn-project/validator-client/src/checkpoint_builder.ts index 05489c21e809..386a17d625b7 100644 --- a/yarn-project/validator-client/src/checkpoint_builder.ts +++ b/yarn-project/validator-client/src/checkpoint_builder.ts @@ -50,6 +50,23 @@ export class CheckpointBuilder implements ICheckpointBlockBuilder { /** Persistent contracts DB shared across all blocks in this checkpoint. */ protected contractsDB: PublicContractsDB; + /** Returns the current fork. */ + public getFork(): MerkleTreeWriteOperations { + return this.fork; + } + + /** Returns the native fork ID. */ + public getForkId(): number { + return this.fork.forkId; + } + + /** Replaces the fork used for subsequent block builds, closing the previous one. */ + public async setFork(fork: MerkleTreeWriteOperations): Promise { + await this.fork.close(); + this.fork = fork; + this.checkpointBuilder.setDb(fork); + } + constructor( private checkpointBuilder: LightweightCheckpointBuilder, private fork: MerkleTreeWriteOperations, diff --git a/yarn-project/validator-client/src/proposal_handler.ts b/yarn-project/validator-client/src/proposal_handler.ts index af81e14fc8a7..17911b7bcbd9 100644 --- a/yarn-project/validator-client/src/proposal_handler.ts +++ b/yarn-project/validator-client/src/proposal_handler.ts @@ -18,7 +18,12 @@ import type { BlockData, L2Block, L2BlockSink, L2BlockSource } from '@aztec/stdl import { validateCheckpoint } from '@aztec/stdlib/checkpoint'; import { getEpochAtSlot, getTimestampForSlot } from '@aztec/stdlib/epoch-helpers'; import { Gas } from '@aztec/stdlib/gas'; -import type { ITxProvider, ValidatorClientFullConfig, WorldStateSynchronizer } from '@aztec/stdlib/interfaces/server'; +import type { + ITxProvider, + MerkleTreeWriteOperations, + ValidatorClientFullConfig, + WorldStateSynchronizer, +} from '@aztec/stdlib/interfaces/server'; import { type L1ToL2MessageSource, accumulateCheckpointOutHashes, @@ -341,6 +346,11 @@ export class ProposalHandler { .filter(c => c.checkpointNumber < checkpointNumber) .map(c => c.checkpointOutHash); + // Fork before the block to be built + const parentBlockNumber = BlockNumber(blockNumber - 1); + await this.worldState.syncImmediate(parentBlockNumber); + await using fork = await this.worldState.fork(parentBlockNumber); + // Try re-executing the transactions in the proposal if needed let reexecutionResult; try { @@ -352,6 +362,7 @@ export class ProposalHandler { txs, l1ToL2Messages, previousCheckpointOutHashes, + fork, ); } catch (error) { this.log.error(`Error reexecuting txs while processing block proposal`, error, proposalInfo); @@ -359,9 +370,11 @@ export class ProposalHandler { return { isValid: false, blockNumber, reason, reexecutionResult }; } - // If we succeeded, push this block into the archiver (unless disabled) + // If we succeeded, push this block into the archiver and commit the fork to LMDB if (reexecutionResult?.block && this.config.skipPushProposedBlocksToArchiver === false) { + this.worldState.registerForkForBlock(reexecutionResult.block.archive.root, fork.forkId); await this.blockSource.addBlock(reexecutionResult.block); + await this.worldState.syncImmediate(blockNumber); } this.log.info( @@ -595,6 +608,7 @@ export class ProposalHandler { txs: Tx[], l1ToL2Messages: Fr[], previousCheckpointOutHashes: Fr[], + fork: MerkleTreeWriteOperations, ): Promise { const { blockHeader, txHashes } = proposal; @@ -613,11 +627,6 @@ export class ProposalHandler { const allBlocksInSlot = await this.blockSource.getBlocksForSlot(slot); const priorBlocks = allBlocksInSlot.filter(b => b.number < blockNumber && b.header.getSlot() === slot); - // Fork before the block to be built - const parentBlockNumber = BlockNumber(blockNumber - 1); - await this.worldState.syncImmediate(parentBlockNumber); - await using fork = await this.worldState.fork(parentBlockNumber); - // Verify the fork's archive root matches the proposal's expected last archive. // If they don't match, our world state synced to a different chain and reexecution would fail. const forkArchiveRoot = new Fr((await fork.getTreeInfo(MerkleTreeId.ARCHIVE)).root); diff --git a/yarn-project/validator-client/src/validator.integration.test.ts b/yarn-project/validator-client/src/validator.integration.test.ts index 6b93351cf590..dd0efba88cc5 100644 --- a/yarn-project/validator-client/src/validator.integration.test.ts +++ b/yarn-project/validator-client/src/validator.integration.test.ts @@ -31,7 +31,7 @@ import { mockTx } from '@aztec/stdlib/testing'; import type { PublicDataTreeLeaf } from '@aztec/stdlib/trees'; import { BlockHeader, type CheckpointGlobalVariables, Tx } from '@aztec/stdlib/tx'; import { ServerWorldStateSynchronizer } from '@aztec/world-state'; -import { NativeWorldStateService } from '@aztec/world-state/native'; +import { NativeWorldStateService, WorldStateMessageType } from '@aztec/world-state/native'; import { getGenesisValues } from '@aztec/world-state/testing'; import { describe, expect, it, jest } from '@jest/globals'; @@ -379,6 +379,31 @@ describe('ValidatorClient Integration', () => { }); describe('happy path', () => { + it('uses COMMIT_FORK instead of SYNC_BLOCK when validating blocks', async () => { + const blockCount = 5; + const { blocks } = await buildCheckpoint( + CheckpointNumber(1), + slotNumber, + emptyL1ToL2Messages, + emptyPreviousCheckpointOutHashes, + BlockNumber(1), + blockCount, + () => buildTxs(2), + ); + + // Spy on the attestor's native world state to track message types sent to C++ + const instance = (attestor.worldStateDb as any).instance; + const callSpy = jest.spyOn(instance, 'call'); + + await attestorValidateBlocks(blocks); + + const messageTypes = callSpy.mock.calls.map(call => call[0] as WorldStateMessageType); + expect(messageTypes.filter(t => t === WorldStateMessageType.COMMIT_FORK)).toHaveLength(blockCount); + expect(messageTypes.filter(t => t === WorldStateMessageType.SYNC_BLOCK)).toHaveLength(0); + + callSpy.mockRestore(); + }); + it('validates multiple blocks and attests to checkpoint', async () => { const { blocks, proposal } = await buildCheckpoint( CheckpointNumber(1), diff --git a/yarn-project/world-state/src/native/index.ts b/yarn-project/world-state/src/native/index.ts index 133319956c9d..775f3fb7858f 100644 --- a/yarn-project/world-state/src/native/index.ts +++ b/yarn-project/world-state/src/native/index.ts @@ -1,2 +1,3 @@ export * from './native_world_state.js'; export * from './fork_checkpoint.js'; +export { WorldStateMessageType } from './message.js'; diff --git a/yarn-project/world-state/src/native/merkle_trees_facade.ts b/yarn-project/world-state/src/native/merkle_trees_facade.ts index b8d4ca92b3e0..0eb844fb5460 100644 --- a/yarn-project/world-state/src/native/merkle_trees_facade.ts +++ b/yarn-project/world-state/src/native/merkle_trees_facade.ts @@ -218,6 +218,11 @@ export class MerkleTreesForkFacade extends MerkleTreesFacade implements MerkleTr assert.equal(revision.includeUncommitted, true, 'Fork must include uncommitted data'); super(instance, initialHeader, revision); } + + /** Returns the native fork ID. */ + get forkId(): number { + return this.revision.forkId; + } async updateArchive(header: BlockHeader): Promise { await this.instance.call(WorldStateMessageType.UPDATE_ARCHIVE, { forkId: this.revision.forkId, @@ -295,9 +300,9 @@ export class MerkleTreesForkFacade extends MerkleTreesFacade implements MerkleTr try { await this.instance.call(WorldStateMessageType.DELETE_FORK, { forkId: this.revision.forkId }); } catch (err: any) { - // Ignore errors due to native instance being closed during shutdown. - // This can happen when validators are still processing block proposals while the node is stopping. - if (err?.message === 'Native instance is closed') { + // Ignore errors due to native instance being closed during shutdown, or fork already + // destroyed (e.g. via commitFork). + if (err?.message === 'Native instance is closed' || err?.message === 'Fork not found') { return; } throw err; @@ -309,8 +314,12 @@ export class MerkleTreesForkFacade extends MerkleTreesFacade implements MerkleTr void sleep(this.opts.closeDelayMs) .then(() => this.close()) .catch(err => { - if (err && 'message' in err && err.message === 'Native instance is closed') { - return; // Ignore errors due to native instance being closed + if ( + err && + 'message' in err && + (err.message === 'Native instance is closed' || err.message === 'Fork not found') + ) { + return; } this.log.warn('Error closing MerkleTreesForkFacade after delay', { err }); }); diff --git a/yarn-project/world-state/src/native/message.ts b/yarn-project/world-state/src/native/message.ts index edceed40e4b3..6191b5aa977f 100644 --- a/yarn-project/world-state/src/native/message.ts +++ b/yarn-project/world-state/src/native/message.ts @@ -34,6 +34,7 @@ export enum WorldStateMessageType { CREATE_FORK, DELETE_FORK, + COMMIT_FORK, FINALIZE_BLOCKS, UNWIND_BLOCKS, @@ -441,6 +442,10 @@ interface CreateForkResponse { interface DeleteForkRequest extends WithForkId {} +interface CommitForkRequest extends WithCanonicalForkId { + forkId: number; +} + interface CopyStoresRequest extends WithCanonicalForkId { dstPath: string; compact: boolean; @@ -487,6 +492,7 @@ export type WorldStateRequest = { [WorldStateMessageType.CREATE_FORK]: CreateForkRequest; [WorldStateMessageType.DELETE_FORK]: DeleteForkRequest; + [WorldStateMessageType.COMMIT_FORK]: CommitForkRequest; [WorldStateMessageType.REMOVE_HISTORICAL_BLOCKS]: BlockShiftRequest; [WorldStateMessageType.UNWIND_BLOCKS]: BlockShiftRequest; @@ -532,6 +538,7 @@ export type WorldStateResponse = { [WorldStateMessageType.CREATE_FORK]: CreateForkResponse; [WorldStateMessageType.DELETE_FORK]: void; + [WorldStateMessageType.COMMIT_FORK]: WorldStateStatusFull; [WorldStateMessageType.REMOVE_HISTORICAL_BLOCKS]: WorldStateStatusFull; [WorldStateMessageType.UNWIND_BLOCKS]: WorldStateStatusFull; diff --git a/yarn-project/world-state/src/native/native_bench.test.ts b/yarn-project/world-state/src/native/native_bench.test.ts index 1d498024f7ec..51faf20ac6f8 100644 --- a/yarn-project/world-state/src/native/native_bench.test.ts +++ b/yarn-project/world-state/src/native/native_bench.test.ts @@ -64,16 +64,16 @@ describe('Native World State: benchmarks', () => { effectsPerTx: number, worldState: NativeWorldStateService, ) => { - const blocks = []; - const fork = await worldState.fork(); - for (let i = 0; i < numBlocks; i++) { - const { block, messages } = await mockBlock(BlockNumber(i + 1), txsPerBlock, fork, effectsPerTx); - blocks.push({ block, messages }); - } - + // Build each block on a separate fork and sync it before building the next. + // Each fork starts from the latest committed state (one fork per block). const startTime = performance.now(); - for (const { block, messages } of blocks) { + for (let i = 0; i < numBlocks; i++) { + const status = await worldState.getStatusSummary(); + const blockNumber = BlockNumber(status.unfinalizedBlockNumber + 1); + const fork = await worldState.fork(); + const { block, messages } = await mockBlock(blockNumber, txsPerBlock, fork, effectsPerTx); + await fork.close(); await worldState.handleL2BlockAndMessages(block, messages); } diff --git a/yarn-project/world-state/src/native/native_world_state.test.ts b/yarn-project/world-state/src/native/native_world_state.test.ts index ea52aa4a20b3..72da0361ed2d 100644 --- a/yarn-project/world-state/src/native/native_world_state.test.ts +++ b/yarn-project/world-state/src/native/native_world_state.test.ts @@ -32,7 +32,7 @@ import { join } from 'path'; import type { WorldStateTreeMapSizes } from '../synchronizer/factory.js'; import { assertSameState, compareChains, mockBlock, mockEmptyBlock } from '../test/utils.js'; import { INITIAL_NULLIFIER_TREE_SIZE, INITIAL_PUBLIC_DATA_TREE_SIZE } from '../world-state-db/merkle_tree_db.js'; -import type { WorldStateStatusSummary } from './message.js'; +import { WorldStateMessageType, type WorldStateStatusSummary } from './message.js'; import { NativeWorldStateService, WORLD_STATE_DB_VERSION, WORLD_STATE_DIR } from './native_world_state.js'; jest.setTimeout(60_000); @@ -219,8 +219,8 @@ describe('NativeWorldState', () => { await timesAsync(5, async i => { const fork = await ws.fork(); - const { block, messages } = await mockBlock(BlockNumber(i + 1), 2, fork); - await ws.handleL2BlockAndMessages(block, messages); + const { block: b, messages: m } = await mockBlock(BlockNumber(i + 2), 2, fork); + await ws.handleL2BlockAndMessages(b, m); await fork.close(); }); @@ -296,15 +296,23 @@ describe('NativeWorldState', () => { publicDataTreeMapSizeKb: 1024, }; const ws = await NativeWorldStateService.new(rollupAddress, dataDir, wsTreeMapSizes); - const initialFork = await ws.fork(); - const { block: block1, messages: messages1 } = await mockBlock(BlockNumber(1), 8, initialFork); - const { block: block2, messages: messages2 } = await mockBlock(BlockNumber(2), 8, initialFork); - const { block: block3, messages: messages3 } = await mockBlock(BlockNumber(3), 8, initialFork); + const fork1 = await ws.fork(); + const { block: block1, messages: messages1 } = await mockBlock(BlockNumber(1), 8, fork1); + await fork1.close(); // The first block should succeed await expect(ws.handleL2BlockAndMessages(block1, messages1)).resolves.toBeDefined(); + // Build blocks 2 and 3 on separate forks at the advanced tip + const fork2 = await ws.fork(); + const { block: block2, messages: messages2 } = await mockBlock(BlockNumber(2), 16, fork2); + await fork2.close(); + + const fork3 = await ws.fork(); + const { block: block3, messages: messages3 } = await mockBlock(BlockNumber(3), 16, fork3); + await fork3.close(); + // The trees should be synched at block 1 const goodSummary = await ws.getStatusSummary(); expect(goodSummary).toEqual({ @@ -314,11 +322,10 @@ describe('NativeWorldState', () => { treesAreSynched: true, } as WorldStateStatusSummary); - // The second block should fail + // The second block should fail (DB too small) await expect(ws.handleL2BlockAndMessages(block2, messages2)).rejects.toThrow(); - // The summary should indicate that the unfinalized block number (that of the archive tree) is 2 - // But it should also tell us that the trees are not synched + // The archive tree committed (small) but other trees failed → permanently out of sync const badSummary = await ws.getStatusSummary(); expect(badSummary).toEqual({ unfinalizedBlockNumber: BlockNumber(2), @@ -327,11 +334,10 @@ describe('NativeWorldState', () => { treesAreSynched: false, } as WorldStateStatusSummary); - // Commits should always fail now, the trees are in an inconsistent state + // Further syncs fail because trees are out of sync await expect(ws.handleL2BlockAndMessages(block2, messages2)).rejects.toThrow('World state trees are out of sync'); await expect(ws.handleL2BlockAndMessages(block3, messages3)).rejects.toThrow('World state trees are out of sync'); - // Creating another world state instance should fail await ws.close(); }); @@ -952,13 +958,13 @@ describe('NativeWorldState', () => { }); it('handles invalid blocks', async () => { - const fork = await ws.fork(); - - // Insert a few blocks + // Insert a few blocks, each on its own fork for (let i = 0; i < 4; i++) { const blockNumber = i + 1; const provenBlock = blockNumber - 2; + const fork = await ws.fork(); const { block, messages } = await mockBlock(BlockNumber(blockNumber), 1, fork); + await fork.close(); const status = await ws.handleL2BlockAndMessages(block, messages); expect(status.summary.unfinalizedBlockNumber).toBe(blockNumber); @@ -976,7 +982,9 @@ describe('NativeWorldState', () => { // Now build an invalid block, see that it is rejected and that we can then insert the correct block { - const { block: block, messages } = await mockBlock(BlockNumber(5), 1, fork); + const fork = await ws.fork(); + const { block, messages } = await mockBlock(BlockNumber(5), 1, fork); + await fork.close(); const invalidBlock = L2Block.fromBuffer(block.toBuffer()); invalidBlock.header.state.partial.nullifierTree.root = Fr.random(); @@ -995,7 +1003,9 @@ describe('NativeWorldState', () => { // Now we push another invalid block, see that it is rejected and check we can unwind to the last proven block { - const { block: block, messages } = await mockBlock(BlockNumber(6), 1, fork); + const fork = await ws.fork(); + const { block, messages } = await mockBlock(BlockNumber(6), 1, fork); + await fork.close(); const invalidBlock = L2Block.fromBuffer(block.toBuffer()); invalidBlock.header.state.partial.nullifierTree.root = Fr.random(); @@ -1079,7 +1089,7 @@ describe('NativeWorldState', () => { let messages: Fr[]; it('retrieves leaf sibling paths', async () => { - const ws = await NativeWorldStateService.new(rollupAddress, dataDir, wsTreeMapSizes); + const ws = await NativeWorldStateService.tmp(); const numBlocks = 2; const txsPerBlock = 2; const noteHashes: Fr[] = []; @@ -1087,7 +1097,7 @@ describe('NativeWorldState', () => { const publicWrites: Buffer[] = []; for (let i = 0; i < numBlocks; i++) { const fork = await ws.fork(); - ({ block, messages } = await mockBlock(BlockNumber(1), txsPerBlock, fork)); + ({ block, messages } = await mockBlock(BlockNumber(i + 1), txsPerBlock, fork)); noteHashes.push(...block.body.txEffects.flatMap(x => x.noteHashes.flatMap(x => x))); nullifiers.push(...block.body.txEffects.flatMap(x => x.nullifiers.flatMap(x => x.toBuffer()))); publicWrites.push(...block.body.txEffects.flatMap(x => x.publicDataWrites.flatMap(x => x.toBuffer()))); @@ -1150,7 +1160,7 @@ describe('NativeWorldState', () => { const txsPerBlock = 2; for (let i = 0; i < numBlocks; i++) { const fork = await ws.fork(); - ({ block, messages } = await mockBlock(BlockNumber(1), txsPerBlock, fork)); + ({ block, messages } = await mockBlock(BlockNumber(i + 1), txsPerBlock, fork)); noteHashes = block.body.txEffects[0].noteHashes.length; nullifiers = block.body.txEffects[0].nullifiers.length; publicTree = block.body.txEffects[0].publicDataWrites.length; @@ -1206,7 +1216,7 @@ describe('NativeWorldState', () => { const statuses = []; for (let i = 0; i < 2; i++) { const fork = await ws.fork(); - ({ block, messages } = await mockBlock(BlockNumber(1), 2, fork)); + ({ block, messages } = await mockBlock(BlockNumber(i + 1), 2, fork)); await fork.close(); const status = await ws.handleL2BlockAndMessages(block, messages); statuses.push(status); @@ -1995,4 +2005,256 @@ describe('NativeWorldState', () => { await fork.close(); }); }); + + describe('registerForkForBlock', () => { + let ws: NativeWorldStateService; + + beforeEach(async () => { + ws = await NativeWorldStateService.tmp(); + }); + + afterEach(async () => { + await ws.close(); + }); + + it('handleL2BlockAndMessages commits a registered fork', async () => { + const fork = await ws.fork(); + const { block, messages } = await mockBlock(BlockNumber(1), 1, fork); + + const forkStateRef = await fork.getStateReference(); + + // Register the fork, then sync — should commit the fork instead of recalculating + ws.registerForkForBlock(block.archive.root, fork.forkId); + await ws.handleL2BlockAndMessages(block, messages); + + const committedStateRef = await ws.getCommitted().getStateReference(); + expect(committedStateRef).toEqual(forkStateRef); + }); + + it('commit then create new fork at advanced tip', async () => { + // Build and commit block 1 via registerFork + handleL2BlockAndMessages + const fork1 = await ws.fork(); + const { block: block1, messages: messages1 } = await mockBlock(BlockNumber(1), 1, fork1); + ws.registerForkForBlock(block1.archive.root, (fork1 as any).forkId); + await ws.handleL2BlockAndMessages(block1, messages1); + + // Create new fork at latest (should be at block 1) + const fork2 = await ws.fork(); + const { block: block2, messages: messages2 } = await mockBlock(BlockNumber(2), 1, fork2); + ws.registerForkForBlock(block2.archive.root, (fork2 as any).forkId); + await ws.handleL2BlockAndMessages(block2, messages2); + + // Verify canonical is at block 2 + const status = await ws.getStatusSummary(); + expect(status.unfinalizedBlockNumber).toEqual(2); + }); + + it('falls back to SYNC_BLOCK when no fork is registered', async () => { + const fork = await ws.fork(); + const { block, messages } = await mockBlock(BlockNumber(1), 1, fork); + await fork.close(); + + // No registerForkForBlock call — handleL2BlockAndMessages should use SYNC_BLOCK + await ws.handleL2BlockAndMessages(block, messages); + + const status = await ws.getStatusSummary(); + expect(status.unfinalizedBlockNumber).toEqual(1); + }); + + it('fork is destroyed after commit via handleL2BlockAndMessages', async () => { + const fork = await ws.fork(); + const { block, messages } = await mockBlock(BlockNumber(1), 1, fork); + + ws.registerForkForBlock(block.archive.root, fork.forkId); + await ws.handleL2BlockAndMessages(block, messages); + + // Fork should be destroyed — operations on it should fail + await expect(fork.getTreeInfo(MerkleTreeId.NULLIFIER_TREE)).rejects.toThrow('Fork not found'); + }); + + it('uses COMMIT_FORK and not SYNC_BLOCK when fork is registered', async () => { + const fork = await ws.fork(); + const { block, messages } = await mockBlock(BlockNumber(1), 1, fork); + + // Spy on the native instance to track which message types are sent + const instance = (ws as any).instance; + const callSpy = jest.spyOn(instance, 'call'); + + ws.registerForkForBlock(block.archive.root, fork.forkId); + await ws.handleL2BlockAndMessages(block, messages); + + const messageTypes = callSpy.mock.calls.map(call => call[0]); + expect(messageTypes).toContain(WorldStateMessageType.COMMIT_FORK); + expect(messageTypes).not.toContain(WorldStateMessageType.SYNC_BLOCK); + + callSpy.mockRestore(); + }); + + it('uses SYNC_BLOCK and not COMMIT_FORK when no fork is registered', async () => { + const fork = await ws.fork(); + const { block, messages } = await mockBlock(BlockNumber(1), 1, fork); + await fork.close(); + + const instance = (ws as any).instance; + const callSpy = jest.spyOn(instance, 'call'); + + await ws.handleL2BlockAndMessages(block, messages); + + const messageTypes = callSpy.mock.calls.map(call => call[0]); + expect(messageTypes).toContain(WorldStateMessageType.SYNC_BLOCK); + expect(messageTypes).not.toContain(WorldStateMessageType.COMMIT_FORK); + + callSpy.mockRestore(); + }); + + it('unwind correctly reverses state committed via commit_fork', async () => { + // Commit 4 blocks via COMMIT_FORK + const treeInfosAfterBlock: Awaited>[] = []; + + for (let i = 1; i <= 4; i++) { + const fork = await ws.fork(); + const { block, messages } = await mockBlock(BlockNumber(i), 1, fork); + ws.registerForkForBlock(block.archive.root, fork.forkId); + await ws.handleL2BlockAndMessages(block, messages); + treeInfosAfterBlock.push(await ws.getCommitted().getTreeInfo(MerkleTreeId.NULLIFIER_TREE)); + } + + expect((await ws.getStatusSummary()).unfinalizedBlockNumber).toBe(4); + + // Unwind back to block 2 + const unwindStatus = await ws.unwindBlocks(BlockNumber(2)); + expect(unwindStatus.summary.unfinalizedBlockNumber).toBe(2); + + // State matches what it was after block 2 + const treeInfoAfterUnwind = await ws.getCommitted().getTreeInfo(MerkleTreeId.NULLIFIER_TREE); + expect(treeInfoAfterUnwind).toEqual(treeInfosAfterBlock[1]); + + // Can build and commit new blocks on top of the unwound state + const fork = await ws.fork(); + const { block, messages } = await mockBlock(BlockNumber(3), 1, fork); + ws.registerForkForBlock(block.archive.root, fork.forkId); + await ws.handleL2BlockAndMessages(block, messages); + expect((await ws.getStatusSummary()).unfinalizedBlockNumber).toBe(3); + }); + + it('falls back to SYNC_BLOCK when COMMIT_FORK fails (fork deleted before commit)', async () => { + const fork = await ws.fork(); + const { block, messages } = await mockBlock(BlockNumber(1), 1, fork); + + ws.registerForkForBlock(block.archive.root, fork.forkId); + await fork.close(); + + const instance = (ws as any).instance; + const callSpy = jest.spyOn(instance, 'call'); + + await ws.handleL2BlockAndMessages(block, messages); + + const messageTypes = callSpy.mock.calls.map(call => call[0]); + expect(messageTypes).toContain(WorldStateMessageType.COMMIT_FORK); + expect(messageTypes).toContain(WorldStateMessageType.SYNC_BLOCK); + + const status = await ws.getStatusSummary(); + expect(status.unfinalizedBlockNumber).toEqual(1); + + callSpy.mockRestore(); + }); + + it('registered fork is not used after unwindBlocks', async () => { + // Commit block 1 + const fork1 = await ws.fork(); + const { block: block1, messages: messages1 } = await mockBlock(BlockNumber(1), 1, fork1); + ws.registerForkForBlock(block1.archive.root, fork1.forkId); + await ws.handleL2BlockAndMessages(block1, messages1); + + // Register fork for block 2 but don't sync it + const fork2 = await ws.fork(); + const { block: block2 } = await mockBlock(BlockNumber(2), 1, fork2); + ws.registerForkForBlock(block2.archive.root, fork2.forkId); + + // Unwind to genesis + await ws.unwindBlocks(BlockNumber(0)); + + // Build a new block 1 with different content + const fork3 = await ws.fork(); + const { block: newBlock1, messages: newMessages1 } = await mockBlock(BlockNumber(1), 2, fork3); + await fork3.close(); + + const instance = (ws as any).instance; + const callSpy = jest.spyOn(instance, 'call'); + + await ws.handleL2BlockAndMessages(newBlock1, newMessages1); + + const messageTypes = callSpy.mock.calls.map(call => call[0]); + expect(messageTypes).toContain(WorldStateMessageType.SYNC_BLOCK); + expect(messageTypes).not.toContain(WorldStateMessageType.COMMIT_FORK); + expect((await ws.getStatusSummary()).unfinalizedBlockNumber).toEqual(1); + + callSpy.mockRestore(); + }); + + it('commits an empty block via COMMIT_FORK', async () => { + const fork = await ws.fork(); + const { block, messages } = await mockEmptyBlock(BlockNumber(1), fork); + + ws.registerForkForBlock(block.archive.root, fork.forkId); + + const instance = (ws as any).instance; + const callSpy = jest.spyOn(instance, 'call'); + + await ws.handleL2BlockAndMessages(block, messages); + + const messageTypes = callSpy.mock.calls.map(call => call[0]); + expect(messageTypes).toContain(WorldStateMessageType.COMMIT_FORK); + expect(messageTypes).not.toContain(WorldStateMessageType.SYNC_BLOCK); + expect((await ws.getStatusSummary()).unfinalizedBlockNumber).toEqual(1); + + callSpy.mockRestore(); + }); + + it('COMMIT_FORK produces the same state as SYNC_BLOCK', async () => { + // Instance A: commit via COMMIT_FORK + const wsA = await NativeWorldStateService.tmp(); + const forkA = await wsA.fork(); + const { block, messages } = await mockBlock(BlockNumber(1), 1, forkA); + wsA.registerForkForBlock(block.archive.root, forkA.forkId); + await wsA.handleL2BlockAndMessages(block, messages); + + // Instance B: commit via SYNC_BLOCK (no fork registration) + const wsB = await NativeWorldStateService.tmp(); + await wsB.handleL2BlockAndMessages(block, messages); + + // State references must be identical + const stateRefA = await wsA.getCommitted().getStateReference(); + const stateRefB = await wsB.getCommitted().getStateReference(); + expect(stateRefA).toEqual(stateRefB); + + const archiveA = await wsA.getCommitted().getTreeInfo(MerkleTreeId.ARCHIVE); + const archiveB = await wsB.getCommitted().getTreeInfo(MerkleTreeId.ARCHIVE); + expect(archiveA).toEqual(archiveB); + + await wsA.close(); + await wsB.close(); + }); + + it('commits 5 sequential blocks via COMMIT_FORK (proposer flow)', async () => { + const blockCount = 5; + const instance = (ws as any).instance; + const callSpy = jest.spyOn(instance, 'call'); + + for (let i = 0; i < blockCount; i++) { + const fork = await ws.fork(); + const { block, messages } = await mockBlock(BlockNumber(i + 1), 1, fork); + ws.registerForkForBlock(block.archive.root, fork.forkId); + await ws.handleL2BlockAndMessages(block, messages); + } + + const messageTypes = callSpy.mock.calls.map(call => call[0]); + expect(messageTypes.filter(t => t === WorldStateMessageType.CREATE_FORK)).toHaveLength(blockCount); + expect(messageTypes.filter(t => t === WorldStateMessageType.COMMIT_FORK)).toHaveLength(blockCount); + expect(messageTypes.filter(t => t === WorldStateMessageType.SYNC_BLOCK)).toHaveLength(0); + expect((await ws.getStatusSummary()).unfinalizedBlockNumber).toEqual(blockCount); + + callSpy.mockRestore(); + }); + }); }); diff --git a/yarn-project/world-state/src/native/native_world_state.ts b/yarn-project/world-state/src/native/native_world_state.ts index 966d5787f8e3..654b2ca13ba2 100644 --- a/yarn-project/world-state/src/native/native_world_state.ts +++ b/yarn-project/world-state/src/native/native_world_state.ts @@ -48,6 +48,8 @@ export class NativeWorldStateService implements MerkleTreeDatabase { protected initialHeader: BlockHeader | undefined; // This is read heavily and only changes when data is persisted, so we cache it private cachedStatusSummary: WorldStateStatusSummary | undefined; + /** The single registered fork awaiting SYNC_BLOCK. Only one fork is active at a time. */ + private registeredFork: { archiveRoot: string; forkId: number } | undefined; protected constructor( protected instance: NativeWorldState, @@ -192,11 +194,36 @@ export class NativeWorldStateService implements MerkleTreeDatabase { ); } + public registerForkForBlock(archiveRoot: Fr, forkId: number): void { + this.registeredFork = { archiveRoot: archiveRoot.toString(), forkId }; + } + public getInitialHeader(): BlockHeader { return this.initialHeader!; } public async handleL2BlockAndMessages(l2Block: L2Block, l1ToL2Messages: Fr[]): Promise { + // Check if a fork already built this block (registered via registerForkForBlock). + // If so, commit the fork directly instead of recalculating via SYNC_BLOCK. + const registered = this.registeredFork; + if (registered && registered.archiveRoot === l2Block.archive.root.toString()) { + this.registeredFork = undefined; + this.log.debug(`Committing registered fork ${registered.forkId} for block ${l2Block.number}`); + try { + return await this.instance.call( + WorldStateMessageType.COMMIT_FORK, + { forkId: registered.forkId, canonical: true as const }, + this.sanitizeAndCacheSummaryFromFull.bind(this), + this.deleteCachedSummary.bind(this), + ); + } catch (err) { + this.log.warn( + `Failed to commit registered fork ${registered.forkId} for block ${l2Block.number}, falling back to SYNC_BLOCK`, + { err }, + ); + } + } + const isFirstBlock = l2Block.indexWithinCheckpoint === 0; if (!isFirstBlock && l1ToL2Messages.length > 0) { throw new Error( @@ -228,7 +255,7 @@ export class NativeWorldStateService implements MerkleTreeDatabase { }); try { - return await this.instance.call( + const result = await this.instance.call( WorldStateMessageType.SYNC_BLOCK, { blockNumber: l2Block.number, @@ -243,6 +270,7 @@ export class NativeWorldStateService implements MerkleTreeDatabase { this.sanitizeAndCacheSummaryFromFull.bind(this), this.deleteCachedSummary.bind(this), ); + return result; } catch (err) { this.worldStateInstrumentation.incCriticalErrors('synch_pending_block'); throw err; @@ -326,8 +354,10 @@ export class NativeWorldStateService implements MerkleTreeDatabase { * @returns The new WorldStateStatus */ public async unwindBlocks(toBlockNumber: BlockNumber) { + // Clear any registered forks — they're invalid after a reorg. + this.registeredFork = undefined; try { - return await this.instance.call( + const result = await this.instance.call( WorldStateMessageType.UNWIND_BLOCKS, { toBlockNumber, @@ -336,6 +366,7 @@ export class NativeWorldStateService implements MerkleTreeDatabase { this.sanitizeAndCacheSummaryFromFull.bind(this), this.deleteCachedSummary.bind(this), ); + return result; } catch (err) { this.worldStateInstrumentation.incCriticalErrors('prune_pending_block'); throw err; diff --git a/yarn-project/world-state/src/native/native_world_state_instance.ts b/yarn-project/world-state/src/native/native_world_state_instance.ts index ebd3d330faf2..32ef77acfb4b 100644 --- a/yarn-project/world-state/src/native/native_world_state_instance.ts +++ b/yarn-project/world-state/src/native/native_world_state_instance.ts @@ -202,6 +202,14 @@ export class NativeWorldState implements NativeWorldStateInstance { if (messageType === WorldStateMessageType.DELETE_FORK) { await requestQueue.stop(); this.queues.delete(forkId); + } else if (messageType === WorldStateMessageType.COMMIT_FORK) { + // COMMIT_FORK runs on the canonical queue, but we need to clean up the fork's queue + const actualForkId = (body as { forkId: number }).forkId; + const forkQueue = this.queues.get(actualForkId); + if (forkQueue) { + await forkQueue.stop(); + this.queues.delete(actualForkId); + } } return response; } diff --git a/yarn-project/world-state/src/native/world_state_ops_queue.ts b/yarn-project/world-state/src/native/world_state_ops_queue.ts index ac47e9f40c87..26ab74eeff6a 100644 --- a/yarn-project/world-state/src/native/world_state_ops_queue.ts +++ b/yarn-project/world-state/src/native/world_state_ops_queue.ts @@ -35,6 +35,7 @@ export const MUTATING_MSG_TYPES = new Set([ WorldStateMessageType.SYNC_BLOCK, WorldStateMessageType.CREATE_FORK, WorldStateMessageType.DELETE_FORK, + WorldStateMessageType.COMMIT_FORK, WorldStateMessageType.FINALIZE_BLOCKS, WorldStateMessageType.UNWIND_BLOCKS, WorldStateMessageType.REMOVE_HISTORICAL_BLOCKS, diff --git a/yarn-project/world-state/src/synchronizer/server_world_state_synchronizer.ts b/yarn-project/world-state/src/synchronizer/server_world_state_synchronizer.ts index b99c1869b2f3..2efea17c6200 100644 --- a/yarn-project/world-state/src/synchronizer/server_world_state_synchronizer.ts +++ b/yarn-project/world-state/src/synchronizer/server_world_state_synchronizer.ts @@ -85,6 +85,10 @@ export class ServerWorldStateSynchronizer return this.merkleTreeDb.fork(blockNumber, opts); } + public registerForkForBlock(archiveRoot: Fr, forkId: number): void { + this.merkleTreeDb.registerForkForBlock(archiveRoot, forkId); + } + public backupTo(dstPath: string, compact?: boolean): Promise, string>> { return this.merkleTreeDb.backupTo(dstPath, compact); } diff --git a/yarn-project/world-state/src/test/utils.ts b/yarn-project/world-state/src/test/utils.ts index 7a9a2fd3a9fd..adb31ea62f97 100644 --- a/yarn-project/world-state/src/test/utils.ts +++ b/yarn-project/world-state/src/test/utils.ts @@ -111,18 +111,17 @@ export async function mockBlocks( numTxs: number, worldState: NativeWorldStateService, ) { - const tempFork = await worldState.fork(BlockNumber(from - 1)); - const blocks = []; const messagesArray = []; for (let blockNumber = from; blockNumber < from + count; blockNumber++) { + const tempFork = await worldState.fork(); const { block, messages } = await mockBlock(BlockNumber(blockNumber), numTxs, tempFork); blocks.push(block); messagesArray.push(messages); + worldState.registerForkForBlock(block.archive.root, tempFork.forkId); + await worldState.handleL2BlockAndMessages(block, messages); } - await tempFork.close(); - return { blocks, messages: messagesArray }; }