Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,17 @@ template <typename Store, typename HashingPolicy> class ContentAddressedAppendOn
ContentAddressedAppendOnlyTree& operator=(ContentAddressedAppendOnlyTree const&& other) = delete;
virtual ~ContentAddressedAppendOnlyTree() = default;

void clear_initialized_from_block() { store_->clear_initialized_from_block(); }

void sync_pruning_meta(const TreeMeta& canonical)
{
TreeMeta meta;
store_->get_meta(meta);
meta.oldestHistoricBlock = canonical.oldestHistoricBlock;
meta.finalizedBlockHeight = canonical.finalizedBlockHeight;
store_->put_meta(meta);
}

/**
* @brief Adds a single value to the end of the tree
* @param value The value to be added
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ template <typename LeafValueType> class ContentAddressedCachedTreeStore {
ContentAddressedCachedTreeStore& operator=(ContentAddressedCachedTreeStore const& other) = delete;
ContentAddressedCachedTreeStore& operator=(ContentAddressedCachedTreeStore const&& other) = delete;

void clear_initialized_from_block() { forkConstantData_.initialized_from_block_.reset(); }

/**
* @brief Returns the index of the leaf with a value immediately lower than the value provided
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,10 @@ WorldStateWrapper::WorldStateWrapper(const Napi::CallbackInfo& info)
WorldStateMessageType::DELETE_FORK,
[this](msgpack::object& obj, msgpack::sbuffer& buffer) { return delete_fork(obj, buffer); });

_dispatcher.register_target(
WorldStateMessageType::COMMIT_FORK,
[this](msgpack::object& obj, msgpack::sbuffer& buffer) { return commit_fork(obj, buffer); });

_dispatcher.register_target(
WorldStateMessageType::FINALIZE_BLOCKS,
[this](msgpack::object& obj, msgpack::sbuffer& buffer) { return set_finalized(obj, buffer); });
Expand Down Expand Up @@ -787,6 +791,20 @@ bool WorldStateWrapper::delete_fork(msgpack::object& obj, msgpack::sbuffer& buf)
return true;
}

bool WorldStateWrapper::commit_fork(msgpack::object& obj, msgpack::sbuffer& buf)
{
TypedMessage<ForkIdOnlyRequest> request;
obj.convert(request);

WorldStateStatusFull status = _ws->commit_fork(request.value.forkId);

MsgHeader header(request.header.messageId);
messaging::TypedMessage<WorldStateStatusFull> resp_msg(WorldStateMessageType::COMMIT_FORK, header, { status });
msgpack::pack(buf, resp_msg);

return true;
}

bool WorldStateWrapper::close(msgpack::object& obj, msgpack::sbuffer& buf)
{
HeaderOnlyMessage request;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class WorldStateWrapper : public Napi::ObjectWrap<WorldStateWrapper> {

bool create_fork(msgpack::object& obj, msgpack::sbuffer& buffer);
bool delete_fork(msgpack::object& obj, msgpack::sbuffer& buffer);
bool commit_fork(msgpack::object& obj, msgpack::sbuffer& buffer);

bool close(msgpack::object& obj, msgpack::sbuffer& buffer);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ enum WorldStateMessageType {

CREATE_FORK,
DELETE_FORK,
COMMIT_FORK,

FINALIZE_BLOCKS,
UNWIND_BLOCKS,
Expand Down
80 changes: 73 additions & 7 deletions barretenberg/cpp/src/barretenberg/world_state/world_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,19 @@ Fork::SharedPtr WorldState::retrieve_fork(const uint64_t& forkId) const
}
return it->second;
}

Fork::SharedPtr WorldState::retrieve_and_remove_fork(const uint64_t& forkId)
{
std::unique_lock lock(mtx);
auto it = _forks.find(forkId);
if (it == _forks.end()) {
throw std::runtime_error("Fork not found");
}
Fork::SharedPtr fork = it->second;
_forks.erase(it);
return fork;
}

uint64_t WorldState::create_fork(const std::optional<block_number_t>& blockNumber)
{
block_number_t blockNumberForFork = 0;
Expand Down Expand Up @@ -234,13 +247,62 @@ void WorldState::delete_fork(const uint64_t& forkId)
if (forkId == 0) {
throw std::runtime_error("Unable to delete canonical fork");
}
// Retrieving the shared pointer here means we throw if the fork is not available, it also means we are not under a
// lock when we destroy the object
Fork::SharedPtr fork = retrieve_fork(forkId);
{
std::unique_lock lock(mtx);
_forks.erase(forkId);
// Atomically retrieve and remove so no concurrent caller can obtain a reference.
// The local shared_ptr ensures the fork is destroyed outside the lock.
Fork::SharedPtr fork = retrieve_and_remove_fork(forkId);
}

WorldStateStatusFull WorldState::commit_fork(const uint64_t& forkId)
{
if (forkId == CANONICAL_FORK_ID) {
throw std::runtime_error("Cannot commit the canonical fork");
}
validate_trees_are_equally_synched();

// Atomically retrieve and remove so no concurrent caller can obtain a reference.
// The local shared_ptr keeps the fork alive for the duration of this method.
Fork::SharedPtr fork = retrieve_and_remove_fork(forkId);

// Validate tip hasn't moved since fork was created
auto archiveMeta = get_tree_info(WorldStateRevision::committed(), MerkleTreeId::ARCHIVE);
if (archiveMeta.meta.unfinalizedBlockHeight != fork->_blockNumber) {
throw std::runtime_error("Can't commit fork: canonical tip has moved from " +
std::to_string(fork->_blockNumber) + " to " +
std::to_string(archiveMeta.meta.unfinalizedBlockHeight));
}

// Rollback canonical to clear any uncommitted state
rollback();

// Save pruning-related meta from canonical before the fork overwrites it.
// The fork's cached meta may have stale oldestHistoricBlock/finalizedBlockHeight
// from when it was created, so commit_block would overwrite LMDB with stale values.
std::array<TreeMeta, NUM_TREES> canonicalMeta;
get_all_tree_info(WorldStateRevision::committed(), canonicalMeta);

// Clear fork flags so commit_block() is allowed on fork stores
for (auto& [id, tree] : fork->_trees) {
std::visit([](auto&& wrapper) { wrapper.tree->clear_initialized_from_block(); }, tree);
}

// Sync the fork's cached meta with canonical pruning state before committing.
// The fork's cached meta may have stale oldestHistoricBlock/finalizedBlockHeight.
for (auto& entry : fork->_trees) {
std::visit([&](auto&& wrapper) { wrapper.tree->sync_pruning_meta(canonicalMeta[entry.first]); }, entry.second);
}

// Commit fork trees to LMDB
WorldStateStatusFull status;
auto [success, message] = commit(fork, status);
if (!success) {
throw std::runtime_error("Failed to commit fork: " + message);
}

// Rollback canonical so it re-reads the updated LMDB state
rollback();

populate_status_summary(status);
return status;
}

Fork::SharedPtr WorldState::create_new_fork(const block_number_t& blockNumber)
Expand Down Expand Up @@ -520,9 +582,13 @@ void WorldState::update_archive(const StateReference& block_state_ref,
}

std::pair<bool, std::string> WorldState::commit(WorldStateStatusFull& status)
{
return commit(retrieve_fork(CANONICAL_FORK_ID), status);
}

std::pair<bool, std::string> WorldState::commit(Fork::SharedPtr fork, WorldStateStatusFull& status)
{
// NOTE: the calling code is expected to ensure no other reads or writes happen during commit
Fork::SharedPtr fork = retrieve_fork(CANONICAL_FORK_ID);
std::atomic_bool success = true;
std::string message;
Signal signal(static_cast<uint32_t>(fork->_trees.size()));
Expand Down
4 changes: 4 additions & 0 deletions barretenberg/cpp/src/barretenberg/world_state/world_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ class WorldState {

uint64_t create_fork(const std::optional<block_number_t>& blockNumber);
void delete_fork(const uint64_t& forkId);
WorldStateStatusFull commit_fork(const uint64_t& forkId);

WorldStateStatusSummary set_finalized_blocks(const block_number_t& toBlockNumber);
WorldStateStatusFull unwind_blocks(const block_number_t& toBlockNumber);
Expand Down Expand Up @@ -311,6 +312,7 @@ class WorldState {
uint64_t maxReaders);

Fork::SharedPtr retrieve_fork(const uint64_t& forkId) const;
Fork::SharedPtr retrieve_and_remove_fork(const uint64_t& forkId);
Fork::SharedPtr create_new_fork(const block_number_t& blockNumber);
void remove_forks_for_block(const block_number_t& blockNumber);

Expand Down Expand Up @@ -343,6 +345,8 @@ class WorldState {

static void populate_status_summary(WorldStateStatusFull& status);

std::pair<bool, std::string> commit(Fork::SharedPtr fork, WorldStateStatusFull& status);

template <typename TreeType>
void commit_tree(TreeDBStats& dbStats,
Signal& signal,
Expand Down
Loading
Loading